@ -1,7 +1,7 @@
/*****************************************************************************
Copyright ( c ) 2011 , 2018 , Oracle and / or its affiliates . All Rights Reserved .
Copyright ( c ) 2017 , 2021 , MariaDB Corporation .
Copyright ( c ) 2017 , 2022 , MariaDB Corporation .
This program is free software ; you can redistribute it and / or modify it under
the terms of the GNU General Public License as published by the Free Software
@ -89,79 +89,6 @@ struct row_log_buf_t {
row_log_apply ( ) . */
} ;
/** Tracks BLOB allocation during online ALTER TABLE */
class row_log_table_blob_t {
public :
/** Constructor (declaring a BLOB freed)
@ param offset_arg row_log_t : : tail : : total */
# ifdef UNIV_DEBUG
row_log_table_blob_t ( ulonglong offset_arg ) :
old_offset ( 0 ) , free_offset ( offset_arg ) ,
offset ( BLOB_FREED ) { }
# else /* UNIV_DEBUG */
row_log_table_blob_t ( ) :
offset ( BLOB_FREED ) { }
# endif /* UNIV_DEBUG */
/** Declare a BLOB freed again.
@ param offset_arg row_log_t : : tail : : total */
# ifdef UNIV_DEBUG
void blob_free ( ulonglong offset_arg )
# else /* UNIV_DEBUG */
void blob_free ( )
# endif /* UNIV_DEBUG */
{
ut_ad ( offset < offset_arg ) ;
ut_ad ( offset ! = BLOB_FREED ) ;
ut_d ( old_offset = offset ) ;
ut_d ( free_offset = offset_arg ) ;
offset = BLOB_FREED ;
}
/** Declare a freed BLOB reused.
@ param offset_arg row_log_t : : tail : : total */
void blob_alloc ( ulonglong offset_arg ) {
ut_ad ( free_offset < = offset_arg ) ;
ut_d ( old_offset = offset ) ;
offset = offset_arg ;
}
/** Determine if a BLOB was freed at a given log position
@ param offset_arg row_log_t : : head : : total after the log record
@ return true if freed */
bool is_freed ( ulonglong offset_arg ) const {
/* This is supposed to be the offset at the end of the
current log record . */
ut_ad ( offset_arg > 0 ) ;
/* We should never get anywhere close the magic value. */
ut_ad ( offset_arg < BLOB_FREED ) ;
return ( offset_arg < offset ) ;
}
private :
/** Magic value for a freed BLOB */
static const ulonglong BLOB_FREED = ~ 0ULL ;
# ifdef UNIV_DEBUG
/** Old offset, in case a page was freed, reused, freed, ... */
ulonglong old_offset ;
/** Offset of last blob_free() */
ulonglong free_offset ;
# endif /* UNIV_DEBUG */
/** Byte offset to the log file */
ulonglong offset ;
} ;
/** @brief Map of off-page column page numbers to 0 or log byte offsets.
If there is no mapping for a page number , it is safe to access .
If a page number maps to 0 , it is an off - page column that has been freed .
If a page number maps to a nonzero number , the number is a byte offset
into the index - > online_log , indicating that the page is safe to access
when applying log records starting from that offset . */
typedef std : : map <
ulint ,
row_log_table_blob_t ,
std : : less < ulint > ,
ut_allocator < std : : pair < const ulint , row_log_table_blob_t > > >
page_no_map ;
/** @brief Buffer for logging modifications during online index creation
All modifications to an index that is being created will be logged by
@ -178,10 +105,6 @@ struct row_log_t {
pfs_os_file_t fd ; /*!< file descriptor */
mysql_mutex_t mutex ; /*!< mutex protecting error,
max_trx and tail */
page_no_map * blobs ; /*!< map of page numbers of off-page columns
that have been freed during table - rebuilding
ALTER TABLE ( row_log_table_ * ) ; protected by
index - > lock X - latch only */
dict_table_t * table ; /*!< table that is being rebuilt,
or NULL when this is a secondary
index that is being created online */
@ -241,6 +164,11 @@ struct row_log_t {
const TABLE * old_table ; /*< Use old table in case of error. */
uint64_t n_rows ; /*< Number of rows read from the table */
/** Alter table transaction. It can be used to apply the DML logs
into the table */
const trx_t * alter_trx ;
/** Determine whether the log should be in the 'instant ADD' format
@ param [ in ] index the clustered index of the source table
@ return whether to use the ' instant ADD COLUMN ' format */
@ -334,8 +262,6 @@ static void row_log_empty(dict_index_t *index)
row_log_t * log = index - > online_log ;
mysql_mutex_lock ( & log - > mutex ) ;
UT_DELETE ( log - > blobs ) ;
log - > blobs = nullptr ;
row_log_block_free ( log - > tail ) ;
row_log_block_free ( log - > head ) ;
row_merge_file_destroy_low ( log - > fd ) ;
@ -345,15 +271,14 @@ static void row_log_empty(dict_index_t *index)
mysql_mutex_unlock ( & log - > mutex ) ;
}
/******************************************************/ /**
Logs an operation to a secondary index that is ( or was ) being created . */
void
row_log_online_op (
/*==============*/
dict_index_t * index , /*!< in/out: index, S or X latched */
const dtuple_t * tuple , /*!< in: index tuple (NULL=empty the index) */
trx_id_t trx_id ) /*!< in: transaction ID for insert,
or 0 for delete */
/** Logs an operation to a secondary index that is (or was) being created.
@ param index index , S or X latched
@ param tuple index tuple ( NULL = empty the index )
@ param trx_id transaction ID for insert , or 0 for delete
@ retval false if row_log_apply ( ) failure happens
or true otherwise */
bool row_log_online_op ( dict_index_t * index , const dtuple_t * tuple ,
trx_id_t trx_id )
{
byte * b ;
ulint extra_size ;
@ -361,16 +286,19 @@ row_log_online_op(
ulint mrec_size ;
ulint avail_size ;
row_log_t * log ;
bool success = true ;
ut_ad ( ! tuple | | dtuple_validate ( tuple ) ) ;
ut_ad ( ! tuple | | dtuple_get_n_fields ( tuple ) = = dict_index_get_n_fields ( index ) ) ;
ut_ad ( index - > lock . have_x ( ) | | index - > lock . have_s ( ) ) ;
if ( index - > is_corrupted ( ) ) {
return ;
return success ;
}
ut_ad ( dict_index_is_online_ddl ( index ) ) ;
ut_ad ( dict_index_is_online_ddl ( index )
| | ( index - > online_log
& & index - > online_status = = ONLINE_INDEX_COMPLETE ) ) ;
/* Compute the size of the record. This differs from
row_merge_buf_encode ( ) , because here we do not encode
@ -395,6 +323,7 @@ row_log_online_op(
log = index - > online_log ;
mysql_mutex_lock ( & log - > mutex ) ;
start_log :
if ( trx_id > log - > max_trx ) {
log - > max_trx = trx_id ;
}
@ -450,7 +379,28 @@ row_log_online_op(
byte * buf = log - > tail . block ;
if ( byte_offset + srv_sort_buf_size > = srv_online_max_size ) {
goto write_failed ;
if ( index - > online_status ! = ONLINE_INDEX_COMPLETE )
goto write_failed ;
/* About to run out of log, InnoDB has to
apply the online log for the completed index */
index - > lock . s_unlock ( ) ;
dberr_t error = row_log_apply (
log - > alter_trx , index , nullptr , nullptr ) ;
index - > lock . s_lock ( SRW_LOCK_CALL ) ;
if ( error ! = DB_SUCCESS ) {
/* Mark all newly added indexes
as corrupted */
log - > error = error ;
success = false ;
goto err_exit ;
}
/* Recheck whether the index online log */
if ( ! index - > online_log ) {
goto err_exit ;
}
goto start_log ;
}
if ( mrec_size = = avail_size ) {
@ -510,6 +460,7 @@ write_failed:
MEM_UNDEFINED ( log - > tail . buf , sizeof log - > tail . buf ) ;
err_exit :
mysql_mutex_unlock ( & log - > mutex ) ;
return success ;
}
/******************************************************/ /**
@ -833,7 +784,6 @@ row_log_table_low_redundant(
dtuple_t * tuple ;
const ulint n_fields = rec_get_n_fields_old ( rec ) ;
ut_ad ( ! page_is_comp ( page_align ( rec ) ) ) ;
ut_ad ( index - > n_fields > = n_fields ) ;
ut_ad ( index - > n_fields = = n_fields | | index - > is_instant ( ) ) ;
ut_ad ( dict_tf2_is_valid ( index - > table - > flags , index - > table - > flags2 ) ) ;
@ -994,22 +944,6 @@ row_log_table_low(
ut_ad ( rec_offs_size ( offsets ) < = sizeof log - > tail . buf ) ;
ut_ad ( index - > lock . have_any ( ) ) ;
# ifdef UNIV_DEBUG
switch ( fil_page_get_type ( page_align ( rec ) ) ) {
case FIL_PAGE_INDEX :
break ;
case FIL_PAGE_TYPE_INSTANT :
ut_ad ( index - > is_instant ( ) ) ;
ut_ad ( ! page_has_siblings ( page_align ( rec ) ) ) ;
ut_ad ( page_get_page_no ( page_align ( rec ) ) = = index - > page ) ;
break ;
default :
ut_ad ( " wrong page type " = = 0 ) ;
}
# endif /* UNIV_DEBUG */
ut_ad ( ! rec_is_metadata ( rec , * index ) ) ;
ut_ad ( page_rec_is_leaf ( rec ) ) ;
ut_ad ( ! page_is_comp ( page_align ( rec ) ) = = ! rec_offs_comp ( offsets ) ) ;
/* old_pk=row_log_table_get_pk() [not needed in INSERT] is a prefix
of the clustered index record ( PRIMARY KEY , DB_TRX_ID , DB_ROLL_PTR ) ,
with no information on virtual columns */
@ -1028,7 +962,6 @@ row_log_table_low(
return ;
}
ut_ad ( page_is_comp ( page_align ( rec ) ) ) ;
ut_ad ( rec_get_status ( rec ) = = REC_STATUS_ORDINARY
| | rec_get_status ( rec ) = = REC_STATUS_INSTANT ) ;
@ -1470,78 +1403,6 @@ row_log_table_insert(
row_log_table_low ( rec , index , offsets , true , NULL ) ;
}
/******************************************************/ /**
Notes that a BLOB is being freed during online ALTER TABLE . */
void
row_log_table_blob_free (
/*====================*/
dict_index_t * index , /*!< in/out: clustered index, X-latched */
ulint page_no ) /*!< in: starting page number of the BLOB */
{
ut_ad ( dict_index_is_clust ( index ) ) ;
ut_ad ( dict_index_is_online_ddl ( index ) ) ;
ut_ad ( index - > lock . have_u_or_x ( ) ) ;
ut_ad ( page_no ! = FIL_NULL ) ;
if ( index - > online_log - > error ! = DB_SUCCESS ) {
return ;
}
page_no_map * blobs = index - > online_log - > blobs ;
if ( blobs = = NULL ) {
index - > online_log - > blobs = blobs = UT_NEW_NOKEY ( page_no_map ( ) ) ;
}
# ifdef UNIV_DEBUG
const ulonglong log_pos = index - > online_log - > tail . total ;
# else
# define log_pos /* empty */
# endif /* UNIV_DEBUG */
const page_no_map : : value_type v ( page_no ,
row_log_table_blob_t ( log_pos ) ) ;
std : : pair < page_no_map : : iterator , bool > p = blobs - > insert ( v ) ;
if ( ! p . second ) {
/* Update the existing mapping. */
ut_ad ( p . first - > first = = page_no ) ;
p . first - > second . blob_free ( log_pos ) ;
}
# undef log_pos
}
/******************************************************/ /**
Notes that a BLOB is being allocated during online ALTER TABLE . */
void
row_log_table_blob_alloc (
/*=====================*/
dict_index_t * index , /*!< in/out: clustered index, X-latched */
ulint page_no ) /*!< in: starting page number of the BLOB */
{
ut_ad ( dict_index_is_clust ( index ) ) ;
ut_ad ( dict_index_is_online_ddl ( index ) ) ;
ut_ad ( index - > lock . have_u_or_x ( ) ) ;
ut_ad ( page_no ! = FIL_NULL ) ;
if ( index - > online_log - > error ! = DB_SUCCESS ) {
return ;
}
/* Only track allocations if the same page has been freed
earlier . Double allocation without a free is not allowed . */
if ( page_no_map * blobs = index - > online_log - > blobs ) {
page_no_map : : iterator p = blobs - > find ( page_no ) ;
if ( p ! = blobs - > end ( ) ) {
ut_ad ( p - > first = = page_no ) ;
p - > second . blob_alloc ( index - > online_log - > tail . total ) ;
}
}
}
/******************************************************/ /**
Converts a log record to a table row .
@ return converted row , or NULL if the conversion fails */
@ -1618,34 +1479,13 @@ row_log_table_apply_convert_mrec(
ut_ad ( rec_offs_any_extern ( offsets ) ) ;
index - > lock . x_lock ( SRW_LOCK_CALL ) ;
if ( const page_no_map * blobs = log - > blobs ) {
data = rec_get_nth_field (
mrec , offsets , i , & len ) ;
ut_ad ( len > = BTR_EXTERN_FIELD_REF_SIZE ) ;
ulint page_no = mach_read_from_4 (
data + len - ( BTR_EXTERN_FIELD_REF_SIZE
- BTR_EXTERN_PAGE_NO ) ) ;
page_no_map : : const_iterator p = blobs - > find (
page_no ) ;
if ( p ! = blobs - > end ( )
& & p - > second . is_freed ( log - > head . total ) ) {
/* This BLOB has been freed.
We must not access the row . */
* error = DB_MISSING_HISTORY ;
dfield_set_data ( dfield , data , len ) ;
dfield_set_ext ( dfield ) ;
goto blob_done ;
}
}
data = btr_rec_copy_externally_stored_field (
mrec , offsets ,
index - > table - > space - > zip_size ( ) ,
i , & len , heap ) ;
ut_a ( data ) ;
dfield_set_data ( dfield , data , len ) ;
blob_done :
index - > lock . x_unlock ( ) ;
} else {
data = rec_get_nth_field ( mrec , offsets , i , & len ) ;
@ -1693,6 +1533,12 @@ blob_done:
if ( ( new_col - > prtype & DATA_NOT_NULL )
& & dfield_is_null ( dfield ) ) {
if ( ! log - > allow_not_null ) {
/* We got a NULL value for a NOT NULL column. */
* error = DB_INVALID_NULL ;
return NULL ;
}
const dfield_t & default_field
= log - > defaults - > fields [ col_no ] ;
@ -1702,12 +1548,6 @@ blob_done:
WARN_DATA_TRUNCATED , 1 ,
ulong ( log - > n_rows ) ) ;
if ( ! log - > allow_not_null ) {
/* We got a NULL value for a NOT NULL column. */
* error = DB_INVALID_NULL ;
return NULL ;
}
* dfield = default_field ;
}
@ -1818,15 +1658,6 @@ row_log_table_apply_insert(
mrec , dup - > index , offsets , log , heap , & error ) ;
switch ( error ) {
case DB_MISSING_HISTORY :
ut_ad ( log - > blobs ) ;
/* Because some BLOBs are missing, we know that the
transaction was rolled back later ( a rollback of
an insert can free BLOBs ) .
We can simply skip the insert : the subsequent
ROW_T_DELETE will be ignored , or a ROW_T_UPDATE will
be interpreted as ROW_T_INSERT . */
return ( DB_SUCCESS ) ;
case DB_SUCCESS :
ut_ad ( row ! = NULL ) ;
break ;
@ -2101,20 +1932,6 @@ row_log_table_apply_update(
mrec , dup - > index , offsets , log , heap , & error ) ;
switch ( error ) {
case DB_MISSING_HISTORY :
/* The record contained BLOBs that are now missing. */
ut_ad ( log - > blobs ) ;
/* Whether or not we are updating the PRIMARY KEY, we
know that there should be a subsequent
ROW_T_DELETE for rolling back a preceding ROW_T_INSERT ,
overriding this ROW_T_UPDATE record . ( * 1 )
This allows us to interpret this ROW_T_UPDATE
as ROW_T_DELETE .
When applying the subsequent ROW_T_DELETE , no matching
record will be found . */
/* fall through */
case DB_SUCCESS :
ut_ad ( row ! = NULL ) ;
break ;
@ -2144,79 +1961,16 @@ row_log_table_apply_update(
}
# endif /* UNIV_DEBUG */
if ( page_rec_is_infimum ( btr_pcur_get_rec ( & pcur ) )
| | btr_pcur_get_low_match ( & pcur ) < index - > n_uniq ) {
/* The record was not found. This should only happen
when an earlier ROW_T_INSERT or ROW_T_UPDATE was
diverted because BLOBs were freed when the insert was
later rolled back . */
ut_ad ( log - > blobs ) ;
if ( error = = DB_SUCCESS ) {
/* An earlier ROW_T_INSERT could have been
skipped because of a missing BLOB , like this :
BEGIN ;
INSERT INTO t SET blob_col = ' blob value ' ;
UPDATE t SET blob_col = ' ' ;
ROLLBACK ;
This would generate the following records :
ROW_T_INSERT ( referring to ' blob value ' )
ROW_T_UPDATE
ROW_T_UPDATE ( referring to ' blob value ' )
ROW_T_DELETE
[ ROLLBACK removes the ' blob value ' ]
The ROW_T_INSERT would have been skipped
because of a missing BLOB . Now we are
executing the first ROW_T_UPDATE .
The second ROW_T_UPDATE ( for the ROLLBACK )
would be interpreted as ROW_T_DELETE , because
the BLOB would be missing .
We could probably assume that the transaction
has been rolled back and simply skip the
' insert ' part of this ROW_T_UPDATE record .
However , there might be some complex scenario
that could interfere with such a shortcut .
So , we will insert the row ( and risk
introducing a bogus duplicate key error
for the ALTER TABLE ) , and a subsequent
ROW_T_UPDATE or ROW_T_DELETE will delete it . */
mtr_commit ( & mtr ) ;
error = row_log_table_apply_insert_low (
thr , row , offsets_heap , heap , dup ) ;
} else {
/* Some BLOBs are missing, so we are interpreting
this ROW_T_UPDATE as ROW_T_DELETE ( see * 1 ) .
Because the record was not found , we do nothing . */
ut_ad ( error = = DB_MISSING_HISTORY ) ;
error = DB_SUCCESS ;
func_exit :
mtr_commit ( & mtr ) ;
}
func_exit_committed :
ut_ad ( mtr . has_committed ( ) ) ;
if ( error ! = DB_SUCCESS ) {
/* Report the erroneous row using the new
version of the table . */
innobase_row_to_mysql ( dup - > table , log - > table , row ) ;
}
return ( error ) ;
}
ut_ad ( ! page_rec_is_infimum ( btr_pcur_get_rec ( & pcur ) )
& & btr_pcur_get_low_match ( & pcur ) > = index - > n_uniq ) ;
/* Prepare to update (or delete) the record. */
rec_offs * cur_offsets = rec_get_offsets (
btr_pcur_get_rec ( & pcur ) , index , nullptr , index - > n_core_fields ,
ULINT_UNDEFINED , & offsets_heap ) ;
# ifdef UNIV_DEBUG
if ( ! log - > same_pk ) {
/* Only update the record if DB_TRX_ID,DB_ROLL_PTR match what
was buffered . */
ulint len ;
const byte * rec_trx_id
= rec_get_nth_field ( btr_pcur_get_rec ( & pcur ) ,
@ -2231,60 +1985,29 @@ func_exit_committed:
+ static_cast < const char * > ( old_pk_trx_id - > data )
= = old_pk_trx_id [ 1 ] . data ) ;
ut_d ( trx_id_check ( old_pk_trx_id - > data , log - > min_trx ) ) ;
if ( memcmp ( rec_trx_id , old_pk_trx_id - > data ,
DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN ) ) {
/* The ROW_T_UPDATE was logged for a different
DB_TRX_ID , DB_ROLL_PTR . This is possible if an
earlier ROW_T_INSERT or ROW_T_UPDATE was diverted
because some BLOBs were missing due to rolling
back the initial insert or due to purging
the old BLOB values of an update . */
ut_ad ( log - > blobs ) ;
if ( error ! = DB_SUCCESS ) {
ut_ad ( error = = DB_MISSING_HISTORY ) ;
/* Some BLOBs are missing, so we are
interpreting this ROW_T_UPDATE as
ROW_T_DELETE ( see * 1 ) .
Because this is a different row ,
we will do nothing . */
error = DB_SUCCESS ;
} else {
/* Because the user record is missing due to
BLOBs that were missing when processing
an earlier log record , we should
interpret the ROW_T_UPDATE as ROW_T_INSERT .
However , there is a different user record
with the same PRIMARY KEY value already . */
error = DB_DUPLICATE_KEY ;
}
goto func_exit ;
}
}
if ( error ! = DB_SUCCESS ) {
ut_ad ( error = = DB_MISSING_HISTORY ) ;
ut_ad ( log - > blobs ) ;
/* Some BLOBs are missing, so we are interpreting
this ROW_T_UPDATE as ROW_T_DELETE ( see * 1 ) . */
error = row_log_table_apply_delete_low (
& pcur , cur_offsets , heap , & mtr ) ;
goto func_exit_committed ;
ut_ad ( ! memcmp ( rec_trx_id , old_pk_trx_id - > data ,
DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN ) ) ;
}
# endif
dtuple_t * entry = row_build_index_entry_low (
row , NULL , index , heap , ROW_BUILD_NORMAL ) ;
upd_t * update = row_upd_build_difference_binary (
index , entry , btr_pcur_get_rec ( & pcur ) , cur_offsets ,
false , NULL , heap , dup - > table , & error ) ;
if ( error ! = DB_SUCCESS ) {
goto func_exit ;
}
if ( error ! = DB_SUCCESS | | ! update - > n_fields ) {
func_exit :
mtr . commit ( ) ;
func_exit_committed :
ut_ad ( mtr . has_committed ( ) ) ;
if ( ! update - > n_fields ) {
/* Nothing to do. */
goto func_exit ;
if ( error ! = DB_SUCCESS ) {
/* Report the erroneous row using the new
version of the table . */
innobase_row_to_mysql ( dup - > table , log - > table , row ) ;
}
return error ;
}
const bool pk_updated
@ -2739,7 +2462,8 @@ ulint
row_log_estimate_work (
const dict_index_t * index )
{
if ( index = = NULL | | index - > online_log = = NULL ) {
if ( index = = NULL | | index - > online_log = = NULL
| | index - > online_log_is_dummy ( ) ) {
return ( 0 ) ;
}
@ -3230,7 +2954,6 @@ row_log_allocate(
log - > fd = OS_FILE_CLOSED ;
mysql_mutex_init ( index_online_log_key , & log - > mutex , nullptr ) ;
log - > blobs = NULL ;
log - > table = table ;
log - > same_pk = same_pk ;
log - > defaults = defaults ;
@ -3280,6 +3003,15 @@ row_log_allocate(
}
index - > online_log = log ;
if ( ! table ) {
/* Assign the clustered index online log to table.
It can be used by concurrent DML to identify whether
the table has any online DDL */
index - > table - > indexes . start - > online_log_make_dummy ( ) ;
log - > alter_trx = trx ;
}
/* While we might be holding an exclusive data dictionary lock
here , in row_log_abort_sec ( ) we will not always be holding it . Use
atomic operations in both cases . */
@ -3297,7 +3029,6 @@ row_log_free(
{
MONITOR_ATOMIC_DEC ( MONITOR_ONLINE_CREATE_INDEX ) ;
UT_DELETE ( log - > blobs ) ;
UT_DELETE_ARRAY ( log - > non_core_fields ) ;
row_log_block_free ( log - > tail ) ;
row_log_block_free ( log - > head ) ;
@ -3698,7 +3429,8 @@ interrupted)
@ param [ in , out ] dup for reporting duplicate key errors
@ param [ in , out ] stage performance schema accounting object , used by
ALTER TABLE . If not NULL , then stage - > inc ( ) will be called for each block
of log that is applied .
of log that is applied or nullptr when row log applied done by DML
thread .
@ return DB_SUCCESS , or error code on failure */
static
dberr_t
@ -3720,7 +3452,9 @@ row_log_apply_ops(
const ulint i = 1 + REC_OFFS_HEADER_SIZE
+ dict_index_get_n_fields ( index ) ;
ut_ad ( dict_index_is_online_ddl ( index ) ) ;
ut_ad ( dict_index_is_online_ddl ( index )
| | ( index - > online_log
& & index - > online_status = = ONLINE_INDEX_COMPLETE ) ) ;
ut_ad ( ! index - > is_committed ( ) ) ;
ut_ad ( index - > lock . have_x ( ) ) ;
ut_ad ( index - > online_log ) ;
@ -3740,7 +3474,9 @@ next_block:
ut_ad ( index - > lock . have_x ( ) ) ;
ut_ad ( index - > online_log - > head . bytes = = 0 ) ;
stage - > inc ( row_log_progress_inc_per_block ( ) ) ;
if ( stage ) {
stage - > inc ( row_log_progress_inc_per_block ( ) ) ;
}
if ( trx_is_interrupted ( trx ) ) {
goto interrupted ;
@ -3794,6 +3530,8 @@ all_done:
ut_ad ( has_index_lock ) ;
ut_ad ( index - > online_log - > head . blocks = = 0 ) ;
ut_ad ( index - > online_log - > tail . blocks = = 0 ) ;
index - > online_log - > tail . bytes = 0 ;
index - > online_log - > head . bytes = 0 ;
error = DB_SUCCESS ;
goto func_exit ;
}
@ -4029,7 +3767,8 @@ interrupted)
@ param [ in , out ] table MySQL table ( for reporting duplicates )
@ param [ in , out ] stage performance schema accounting object , used by
ALTER TABLE . stage - > begin_phase_log_index ( ) will be called initially and then
stage - > inc ( ) will be called for each block of log that is applied .
stage - > inc ( ) will be called for each block of log that is applied or nullptr
when row log has been applied by DML thread .
@ return DB_SUCCESS , or error code on failure */
dberr_t
row_log_apply (
@ -4039,20 +3778,24 @@ row_log_apply(
ut_stage_alter_t * stage )
{
dberr_t error ;
row_log_t * log ;
row_merge_dup_t dup = { index , table , NULL , 0 } ;
DBUG_ENTER ( " row_log_apply " ) ;
ut_ad ( dict_index_is_online_ddl ( index ) ) ;
ut_ad ( dict_index_is_online_ddl ( index )
| | ( index - > online_log
& & index - > online_status = = ONLINE_INDEX_COMPLETE ) ) ;
ut_ad ( ! dict_index_is_clust ( index ) ) ;
stage - > begin_phase_log_index ( ) ;
if ( stage ) {
stage - > begin_phase_log_index ( ) ;
}
log_free_check ( ) ;
index - > lock . x_lock ( SRW_LOCK_CALL ) ;
if ( ! dict_table_is_corrupted ( index - > table ) ) {
if ( ! dict_table_is_corrupted ( index - > table )
& & index - > online_log ) {
error = row_log_apply_ops ( trx , index , & dup , stage ) ;
} else {
error = DB_SUCCESS ;
@ -4067,17 +3810,15 @@ row_log_apply(
index - > table - > drop_aborted = TRUE ;
dict_index_set_online_status ( index , ONLINE_INDEX_ABORTED ) ;
} else {
} else if ( stage ) {
/* Mark the index as completed only when it is
being called by DDL thread */
ut_ad ( dup . n_dup = = 0 ) ;
dict_index_set_online_status ( index , ONLINE_INDEX_COMPLETE ) ;
}
log = index - > online_log ;
index - > online_log = NULL ;
index - > lock . x_unlock ( ) ;
row_log_free ( log ) ;
DBUG_RETURN ( error ) ;
}
@ -4102,6 +3843,12 @@ static void row_log_table_empty(dict_index_t *index)
}
}
dberr_t row_log_get_error ( const dict_index_t * index )
{
ut_ad ( index - > online_log ) ;
return index - > online_log - > error ;
}
void dict_table_t : : clear ( que_thr_t * thr )
{
bool rebuild = false ;
@ -4138,3 +3885,279 @@ void dict_table_t::clear(que_thr_t *thr)
index - > clear ( thr ) ;
}
}
const rec_t *
UndorecApplier : : get_old_rec ( const dtuple_t & tuple , dict_index_t * index ,
const rec_t * * clust_rec , rec_offs * * offsets )
{
ut_ad ( index - > is_primary ( ) ) ;
btr_pcur_t pcur ;
bool found = row_search_on_row_ref ( & pcur , BTR_MODIFY_LEAF ,
index - > table , & tuple , & mtr ) ;
ut_a ( found ) ;
* clust_rec = btr_pcur_get_rec ( & pcur ) ;
ulint len = 0 ;
rec_t * prev_version ;
const rec_t * version = * clust_rec ;
do
{
* offsets = rec_get_offsets ( version , index , * offsets ,
index - > n_core_fields , ULINT_UNDEFINED ,
& heap ) ;
roll_ptr_t roll_ptr = trx_read_roll_ptr (
rec_get_nth_field ( version , * offsets , index - > db_roll_ptr ( ) , & len ) ) ;
ut_ad ( len = = DATA_ROLL_PTR_LEN ) ;
if ( is_same ( roll_ptr ) )
return version ;
trx_undo_prev_version_build ( * clust_rec , & mtr , version , index ,
* offsets , heap , & prev_version , nullptr ,
nullptr , 0 , block ) ;
version = prev_version ;
}
while ( version ) ;
return nullptr ;
}
/** Clear out all online log of other online indexes after
encountering the error during row_log_apply ( ) in DML thread
@ param table table which does online DDL */
static void row_log_mark_other_online_index_abort ( dict_table_t * table )
{
dict_index_t * clust_index = dict_table_get_first_index ( table ) ;
for ( dict_index_t * index = dict_table_get_next_index ( clust_index ) ;
index ; index = dict_table_get_next_index ( index ) )
{
if ( index - > online_log & &
index - > online_status < = ONLINE_INDEX_CREATION & &
! index - > is_corrupted ( ) )
{
index - > lock . x_lock ( SRW_LOCK_CALL ) ;
row_log_abort_sec ( index ) ;
index - > type | = DICT_CORRUPT ;
index - > lock . x_unlock ( ) ;
MONITOR_ATOMIC_INC ( MONITOR_BACKGROUND_DROP_INDEX ) ;
}
}
clust_index - > lock . x_lock ( SRW_LOCK_CALL ) ;
clust_index - > online_log = nullptr ;
clust_index - > lock . x_unlock ( ) ;
table - > drop_aborted = TRUE ;
}
void UndorecApplier : : log_insert ( const dtuple_t & tuple ,
dict_index_t * clust_index )
{
DEBUG_SYNC_C ( " row_log_insert_handle " ) ;
ut_ad ( clust_index - > is_primary ( ) ) ;
rec_offs offsets_ [ REC_OFFS_NORMAL_SIZE ] ;
rec_offs * offsets = offsets_ ;
rec_offs_init ( offsets_ ) ;
mtr . start ( ) ;
const rec_t * rec ;
const rec_t * match_rec = get_old_rec ( tuple , clust_index , & rec , & offsets ) ;
if ( ! match_rec )
{
mtr . commit ( ) ;
return ;
}
const rec_t * copy_rec = match_rec ;
if ( match_rec = = rec )
{
copy_rec = rec_copy ( mem_heap_alloc (
heap , rec_offs_size ( offsets ) ) , match_rec , offsets ) ;
rec_offs_make_valid ( copy_rec , clust_index , true , offsets ) ;
}
mtr . commit ( ) ;
dict_table_t * table = clust_index - > table ;
clust_index - > lock . s_lock ( SRW_LOCK_CALL ) ;
if ( clust_index - > online_log & &
! clust_index - > online_log_is_dummy ( ) & &
clust_index - > online_status < = ONLINE_INDEX_CREATION )
{
row_log_table_insert ( copy_rec , clust_index , offsets ) ;
clust_index - > lock . s_unlock ( ) ;
}
else
{
clust_index - > lock . s_unlock ( ) ;
row_ext_t * ext ;
dtuple_t * row = row_build ( ROW_COPY_POINTERS , clust_index ,
copy_rec , offsets , table , nullptr , nullptr , & ext , heap ) ;
if ( table - > n_v_cols )
{
/* Update the row with virtual column values present
in the undo log or update vector */
if ( type = = TRX_UNDO_UPD_DEL_REC )
row_upd_replace_vcol ( row , table , update , false ,
nullptr ,
( cmpl_info & UPD_NODE_NO_ORD_CHANGE )
? nullptr : undo_rec ) ;
else
trx_undo_read_v_cols ( table , undo_rec , row , false ) ;
}
bool success = true ;
dict_index_t * index = dict_table_get_next_index ( clust_index ) ;
while ( index )
{
index - > lock . s_lock ( SRW_LOCK_CALL ) ;
if ( index - > online_log & &
index - > online_status < = ONLINE_INDEX_CREATION & &
! index - > is_corrupted ( ) )
{
dtuple_t * entry = row_build_index_entry_low ( row , ext , index ,
heap , ROW_BUILD_NORMAL ) ;
success = row_log_online_op ( index , entry , trx_id ) ;
}
index - > lock . s_unlock ( ) ;
if ( ! success )
{
row_log_mark_other_online_index_abort ( index - > table ) ;
return ;
}
index = dict_table_get_next_index ( index ) ;
}
}
}
void UndorecApplier : : log_update ( const dtuple_t & tuple ,
dict_index_t * clust_index )
{
rec_offs offsets_ [ REC_OFFS_NORMAL_SIZE ] ;
rec_offs offsets2_ [ REC_OFFS_NORMAL_SIZE ] ;
rec_offs * offsets = offsets_ ;
rec_offs * prev_offsets = offsets2_ ;
rec_offs_init ( offsets_ ) ;
rec_offs_init ( offsets2_ ) ;
dict_table_t * table = clust_index - > table ;
clust_index - > lock . s_lock ( SRW_LOCK_CALL ) ;
bool table_rebuild =
( clust_index - > online_log
& & ! clust_index - > online_log_is_dummy ( )
& & clust_index - > online_status < = ONLINE_INDEX_CREATION ) ;
clust_index - > lock . s_unlock ( ) ;
mtr . start ( ) ;
const rec_t * rec ;
rec_t * prev_version ;
bool is_update = ( type = = TRX_UNDO_UPD_EXIST_REC ) ;
const rec_t * match_rec = get_old_rec ( tuple , clust_index , & rec , & offsets ) ;
if ( ! match_rec )
{
mtr . commit ( ) ;
return ;
}
if ( table_rebuild )
{
const rec_t * copy_rec = match_rec ;
if ( match_rec = = rec )
copy_rec = rec_copy ( mem_heap_alloc (
heap , rec_offs_size ( offsets ) ) , match_rec , offsets ) ;
trx_undo_prev_version_build ( rec , & mtr , match_rec , clust_index ,
offsets , heap , & prev_version , nullptr ,
nullptr , 0 , block ) ;
prev_offsets = rec_get_offsets ( prev_version , clust_index , prev_offsets ,
clust_index - > n_core_fields ,
ULINT_UNDEFINED , & heap ) ;
rec_offs_make_valid ( copy_rec , clust_index , true , offsets ) ;
mtr . commit ( ) ;
clust_index - > lock . s_lock ( SRW_LOCK_CALL ) ;
/* Recheck whether clustered index online log has been cleared */
if ( clust_index - > online_log )
{
if ( is_update )
{
const dtuple_t * rebuilt_old_pk = row_log_table_get_pk (
prev_version , clust_index , prev_offsets , nullptr , & heap ) ;
row_log_table_update ( copy_rec , clust_index , offsets , rebuilt_old_pk ) ;
}
else
row_log_table_delete ( prev_version , clust_index , prev_offsets , nullptr ) ;
}
clust_index - > lock . s_unlock ( ) ;
return ;
}
dtuple_t * row = nullptr ;
row_ext_t * new_ext ;
if ( match_rec ! = rec )
row = row_build ( ROW_COPY_POINTERS , clust_index , match_rec , offsets ,
clust_index - > table , NULL , NULL , & new_ext , heap ) ;
else
row = row_build ( ROW_COPY_DATA , clust_index , rec , offsets ,
clust_index - > table , NULL , NULL , & new_ext , heap ) ;
mtr . commit ( ) ;
row_ext_t * old_ext ;
dtuple_t * old_row = nullptr ;
if ( ! ( this - > cmpl_info & UPD_NODE_NO_ORD_CHANGE ) )
{
for ( ulint i = 0 ; i < dict_table_get_n_v_cols ( table ) ; i + + )
dfield_get_type (
dtuple_get_nth_v_field ( row , i ) ) - > mtype = DATA_MISSING ;
}
if ( is_update )
{
old_row = dtuple_copy ( row , heap ) ;
row_upd_replace ( old_row , & old_ext , clust_index , update , heap ) ;
}
if ( table - > n_v_cols )
row_upd_replace_vcol ( row , table , update , false , nullptr ,
( cmpl_info & UPD_NODE_NO_ORD_CHANGE )
? nullptr : this - > undo_rec ) ;
bool success = true ;
dict_index_t * index = dict_table_get_next_index ( clust_index ) ;
while ( index )
{
index - > lock . s_lock ( SRW_LOCK_CALL ) ;
if ( index - > online_log & &
index - > online_status < = ONLINE_INDEX_CREATION & &
! index - > is_corrupted ( ) )
{
if ( is_update )
{
dtuple_t * old_entry = row_build_index_entry_low (
old_row , old_ext , index , heap , ROW_BUILD_NORMAL ) ;
success = row_log_online_op ( index , old_entry , 0 ) ;
dtuple_t * new_entry = row_build_index_entry_low (
row , new_ext , index , heap , ROW_BUILD_NORMAL ) ;
if ( success )
success = row_log_online_op ( index , new_entry , trx_id ) ;
}
else
{
dtuple_t * old_entry = row_build_index_entry_low (
row , new_ext , index , heap , ROW_BUILD_NORMAL ) ;
success = row_log_online_op ( index , old_entry , 0 ) ;
}
}
index - > lock . s_unlock ( ) ;
if ( ! success )
{
row_log_mark_other_online_index_abort ( index - > table ) ;
return ;
}
index = dict_table_get_next_index ( index ) ;
}
}