@ -67,6 +67,11 @@ struct st_translog_buffer
LSN last_lsn ;
/* This buffer offset in the file */
TRANSLOG_ADDRESS offset ;
/*
Next buffer offset in the file ( it is not always offset + size ,
in case of flush by LSN it can be offset + size - TRANSLOG_PAGE_SIZE )
*/
TRANSLOG_ADDRESS next_buffer_offset ;
/*
How much written ( or will be written when copy_to_buffer_in_progress
become 0 ) to this buffer
@ -150,7 +155,10 @@ struct st_translog_descriptor
/* Last flushed LSN */
LSN flushed ;
/* Last LSN sent to the disk (but maybe not written yet) */
LSN sent_to_file ;
/* All what is after this addess is not sent to disk yet */
TRANSLOG_ADDRESS in_buffers_only ;
pthread_mutex_t sent_to_file_lock ;
} ;
@ -187,6 +195,8 @@ static my_bool write_hook_for_undo(enum translog_record_type type,
TRN * trn , LSN * lsn ,
struct st_translog_parts * parts ) ;
static my_bool translog_page_validator ( uchar * page_addr , uchar * data_ptr ) ;
/*
Initialize log_record_type_descriptors
@ -672,7 +682,6 @@ my_bool translog_read_file_header(LOGHANDLER_FILE_INFO *desc)
static my_bool translog_buffer_init ( struct st_translog_buffer * buffer )
{
DBUG_ENTER ( " translog_buffer_init " ) ;
/* This buffer offset */
buffer - > last_lsn = LSN_IMPOSSIBLE ;
/* This Buffer File */
buffer - > file = - 1 ;
@ -966,7 +975,8 @@ static void translog_put_sector_protection(uchar *page,
static uint32 translog_crc ( uchar * area , uint length )
{
return crc32 ( 0L , ( unsigned char * ) area , length ) ;
DBUG_ENTER ( " translog_crc " ) ;
DBUG_RETURN ( crc32 ( 0L , ( unsigned char * ) area , length ) ) ;
}
@ -1180,6 +1190,7 @@ static void translog_start_buffer(struct st_translog_buffer *buffer,
DBUG_ASSERT ( buffer_no = = buffer - > buffer_no ) ;
buffer - > last_lsn = LSN_IMPOSSIBLE ;
buffer - > offset = log_descriptor . horizon ;
buffer - > next_buffer_offset = LSN_IMPOSSIBLE ;
buffer - > file = log_descriptor . log_file_num [ 0 ] ;
buffer - > overlay = 0 ;
buffer - > size = 0 ;
@ -1254,45 +1265,117 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon,
translog_cursor_init ( cursor , new_buffer , new_buffer_no ) ;
else
translog_start_buffer ( new_buffer , cursor , new_buffer_no ) ;
log_descriptor . buffers [ old_buffer_no ] . next_buffer_offset = new_buffer - > offset ;
translog_new_page_header ( horizon , cursor ) ;
DBUG_RETURN ( 0 ) ;
}
/*
Set max LSN sent to file
Sets max LSN sent to file , and address from which data is only in the buffer
SYNOPSIS
translog_set_sent_to_file ( )
lsn LSN to assign
in_buffers to assign to in_buffers_only
TODO : use atomic operations if possible ( 64 bit architectures ? )
*/
static void translog_set_sent_to_file ( LSN * lsn )
static void translog_set_sent_to_file ( LSN lsn , TRANSLOG_ADDRESS in_buffers )
{
DBUG_ENTER ( " translog_set_sent_to_file " ) ;
pthread_mutex_lock ( & log_descriptor . sent_to_file_lock ) ;
DBUG_ASSERT ( cmp_translog_addr ( * lsn , log_descriptor . sent_to_file ) > = 0 ) ;
log_descriptor . sent_to_file = * lsn ;
DBUG_PRINT ( " enter " , ( " lsn: (%lu,0x%lx) in_buffers: (%lu,0x%lx) "
" in_buffers_only: (%lu,0x%lx) " ,
( ulong ) LSN_FILE_NO ( lsn ) ,
( ulong ) LSN_OFFSET ( lsn ) ,
( ulong ) LSN_FILE_NO ( in_buffers ) ,
( ulong ) LSN_OFFSET ( in_buffers ) ,
( ulong ) LSN_FILE_NO ( log_descriptor . in_buffers_only ) ,
( ulong ) LSN_OFFSET ( log_descriptor . in_buffers_only ) ) ) ;
DBUG_ASSERT ( cmp_translog_addr ( lsn , log_descriptor . sent_to_file ) > = 0 ) ;
log_descriptor . sent_to_file = lsn ;
/* LSN_IMPOSSIBLE == 0 => it will work for very first time */
if ( cmp_translog_addr ( in_buffers , log_descriptor . in_buffers_only ) > 0 )
{
log_descriptor . in_buffers_only = in_buffers ;
DBUG_PRINT ( " info " , ( " set new in_buffers_only " ) ) ;
}
pthread_mutex_unlock ( & log_descriptor . sent_to_file_lock ) ;
DBUG_VOID_RETURN ;
}
/*
Get max LSN send to file
Sets address from which data is only in the buffer
SYNOPSIS
translog_set_only_in_buffers ( )
lsn LSN to assign
in_buffers to assign to in_buffers_only
*/
static void translog_set_only_in_buffers ( TRANSLOG_ADDRESS in_buffers )
{
DBUG_ENTER ( " translog_set_only_in_buffers " ) ;
pthread_mutex_lock ( & log_descriptor . sent_to_file_lock ) ;
DBUG_PRINT ( " enter " , ( " in_buffers: (%lu,0x%lx) "
" in_buffers_only: (%lu,0x%lx) " ,
( ulong ) LSN_FILE_NO ( in_buffers ) ,
( ulong ) LSN_OFFSET ( in_buffers ) ,
( ulong ) LSN_FILE_NO ( log_descriptor . in_buffers_only ) ,
( ulong ) LSN_OFFSET ( log_descriptor . in_buffers_only ) ) ) ;
/* LSN_IMPOSSIBLE == 0 => it will work for very first time */
if ( cmp_translog_addr ( in_buffers , log_descriptor . in_buffers_only ) > 0 )
{
log_descriptor . in_buffers_only = in_buffers ;
DBUG_PRINT ( " info " , ( " set new in_buffers_only " ) ) ;
}
pthread_mutex_unlock ( & log_descriptor . sent_to_file_lock ) ;
DBUG_VOID_RETURN ;
}
/*
Gets address from which data is only in the buffer
SYNOPSIS
translog_only_in_buffers ( )
RETURN
address from which data is only in the buffer
*/
static TRANSLOG_ADDRESS translog_only_in_buffers ( )
{
register TRANSLOG_ADDRESS addr ;
DBUG_ENTER ( " translog_only_in_buffers " ) ;
pthread_mutex_lock ( & log_descriptor . sent_to_file_lock ) ;
addr = log_descriptor . in_buffers_only ;
pthread_mutex_unlock ( & log_descriptor . sent_to_file_lock ) ;
DBUG_RETURN ( addr ) ;
}
/*
Get max LSN sent to file
SYNOPSIS
translog_get_sent_to_file ( )
lsn LSN to value
RETURN
max LSN send to file
*/
static void translog_get_sent_to_file ( LSN * lsn )
static LSN translog_get_sent_to_file ( )
{
register LSN lsn ;
DBUG_ENTER ( " translog_get_sent_to_file " ) ;
pthread_mutex_lock ( & log_descriptor . sent_to_file_lock ) ;
* lsn = log_descriptor . sent_to_file ;
lsn = log_descriptor . sent_to_file ;
pthread_mutex_unlock ( & log_descriptor . sent_to_file_lock ) ;
DBUG_VOID_RETURN ;
DBUG_RETURN ( lsn ) ;
}
@ -1532,16 +1615,20 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
file . file = buffer - > file ;
for ( i = 0 ; i < buffer - > size ; i + = TRANSLOG_PAGE_SIZE )
{
TRANSLOG_ADDRESS addr = ( buffer - > offset + i ) ;
TRANSLOG_VALIDATOR_DATA data ;
data . addr = & addr ;
DBUG_ASSERT ( log_descriptor . pagecache - > block_size = = TRANSLOG_PAGE_SIZE ) ;
DBUG_ASSERT ( i + TRANSLOG_PAGE_SIZE < = buffer - > size ) ;
if ( pagecache_write ( log_descriptor . pagecache ,
if ( pagecache_injec t ( log_descriptor . pagecache ,
& file ,
( LSN_OFFSET ( buffer - > offset ) + i ) / TRANSLOG_PAGE_SIZE ,
3 ,
buffer - > buffer + i ,
PAGECACHE_PLAIN_PAGE ,
PAGECACHE_LOCK_LEFT_UNLOCKED ,
PAGECACHE_PIN_LEFT_UNPINNED , PAGECACHE_WRITE_DONE , 0 ) )
PAGECACHE_PIN_LEFT_UNPINNED , 0 ,
& translog_page_validator , ( uchar * ) & data ) )
{
UNRECOVERABLE_ERROR ( ( " Can't write page (%lu,0x%lx) to pagecache " ,
( ulong ) buffer - > file ,
@ -1559,9 +1646,12 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
( ulong ) buffer - > size , errno ) ) ;
DBUG_RETURN ( 1 ) ;
}
if ( LSN_OFFSET ( buffer - > last_lsn ) ! = 0 ) /* if buffer->last_lsn is set */
translog_set_sent_to_file ( & buffer - > last_lsn ) ;
if ( LSN_OFFSET ( buffer - > last_lsn ) ! = 0 ) /* if buffer->last_lsn is set */
translog_set_sent_to_file ( buffer - > last_lsn ,
buffer - > next_buffer_offset ) ;
else
translog_set_only_in_buffers ( buffer - > next_buffer_offset ) ;
/* Free buffer */
buffer - > file = - 1 ;
buffer - > overlay = 0 ;
@ -1747,6 +1837,59 @@ static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr)
}
/*
Lock the loghandler
SYNOPSIS
translog_lock ( )
RETURN
0 OK
1 Error
*/
my_bool translog_lock ( )
{
struct st_translog_buffer * current_buffer ;
DBUG_ENTER ( " translog_lock " ) ;
/*
Locking the loghandler mean locking current buffer , but it can change
during locking , so we should check it
*/
for ( ; ; )
{
current_buffer = log_descriptor . bc . buffer ;
if ( translog_buffer_lock ( current_buffer ) )
DBUG_RETURN ( 1 ) ;
if ( log_descriptor . bc . buffer = = current_buffer )
break ;
translog_buffer_unlock ( current_buffer ) ;
}
DBUG_RETURN ( 0 ) ;
}
/*
Unlock the loghandler
SYNOPSIS
translog_unlock ( )
RETURN
0 OK
1 Error
*/
my_bool translog_unlock ( )
{
DBUG_ENTER ( " translog_unlock " ) ;
translog_buffer_unlock ( log_descriptor . bc . buffer ) ;
DBUG_RETURN ( 0 ) ;
}
/*
Get log page by file number and offset of the beginning of the page
@ -1763,7 +1906,7 @@ static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr)
static uchar * translog_get_page ( TRANSLOG_VALIDATOR_DATA * data , uchar * buffer )
{
TRANSLOG_ADDRESS addr = * ( data - > addr ) ;
TRANSLOG_ADDRESS addr = * ( data - > addr ) , in_buffers ;
uint cache_index ;
uint32 file_no = LSN_FILE_NO ( addr ) ;
DBUG_ENTER ( " translog_get_page " ) ;
@ -1775,6 +1918,107 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer)
/* it is really page address */
DBUG_ASSERT ( LSN_OFFSET ( addr ) % TRANSLOG_PAGE_SIZE = = 0 ) ;
in_buffers = translog_only_in_buffers ( ) ;
DBUG_PRINT ( " info " , ( " in_buffers: (%lu,0x%lx) " ,
( ulong ) LSN_FILE_NO ( in_buffers ) ,
( ulong ) LSN_OFFSET ( in_buffers ) ) ) ;
if ( in_buffers ! = LSN_IMPOSSIBLE & &
cmp_translog_addr ( addr , in_buffers ) > = 0 )
{
translog_lock ( ) ;
/* recheck with locked loghandler */
in_buffers = translog_only_in_buffers ( ) ;
if ( cmp_translog_addr ( addr , in_buffers ) > = 0 )
{
uint16 buffer_no = log_descriptor . bc . buffer_no ;
uint16 buffer_start = buffer_no ;
struct st_translog_buffer * buffer_unlock = log_descriptor . bc . buffer ;
struct st_translog_buffer * curr_buffer = log_descriptor . bc . buffer ;
for ( ; ; )
{
/*
if the page is in the buffer and it is the last version of the
page ( in case of devision the page bu buffer flush
*/
if ( curr_buffer - > file ! = - 1 & &
cmp_translog_addr ( addr , curr_buffer - > offset ) > = 0 & &
cmp_translog_addr ( addr ,
( curr_buffer - > next_buffer_offset ?
curr_buffer - > next_buffer_offset :
curr_buffer - > offset + curr_buffer - > size ) ) < 0 )
{
int is_last_unfinished_page ;
uint last_protected_sector = 0 ;
uchar * from , * table ;
translog_wait_for_writers ( curr_buffer ) ;
DBUG_ASSERT ( LSN_FILE_NO ( addr ) = = LSN_FILE_NO ( curr_buffer - > offset ) ) ;
from = curr_buffer - > buffer + ( addr - curr_buffer - > offset ) ;
memcpy ( buffer , from , TRANSLOG_PAGE_SIZE ) ;
is_last_unfinished_page = ( ( log_descriptor . bc . buffer = =
curr_buffer ) & &
( log_descriptor . bc . ptr > = from ) & &
( log_descriptor . bc . ptr <
from + TRANSLOG_PAGE_SIZE ) ) ;
if ( is_last_unfinished_page & &
( buffer [ TRANSLOG_PAGE_FLAGS ] & TRANSLOG_SECTOR_PROTECTION ) )
{
last_protected_sector = ( ( log_descriptor . bc . previous_offset - 1 ) /
DISK_DRIVE_SECTOR_SIZE ) ;
table = buffer + log_descriptor . page_overhead -
( TRANSLOG_PAGE_SIZE / DISK_DRIVE_SECTOR_SIZE ) * 2 ;
}
DBUG_ASSERT ( buffer_unlock = = curr_buffer ) ;
translog_buffer_unlock ( buffer_unlock ) ;
if ( is_last_unfinished_page )
{
uint i ;
/*
This is last unfinished page = > we should not check CRC and
remove only that protection which already installed ( no need
to check it )
We do not check the flag of sector protection , because if
( buffer [ TRANSLOG_PAGE_FLAGS ] & TRANSLOG_SECTOR_PROTECTION ) is
not set then last_protected_sector will be 0 so following loop
will be never executed
*/
DBUG_PRINT ( " info " , ( " This is last unfinished page, "
" last protected sector %u " ,
last_protected_sector ) ) ;
for ( i = 1 ; i < = last_protected_sector ; i + + )
{
uint index = i * 2 ;
uint offset = i * DISK_DRIVE_SECTOR_SIZE ;
DBUG_PRINT ( " info " , ( " Sector %u: 0x%02x%02x <- 0x%02x%02x " ,
i , buffer [ offset ] , buffer [ offset + 1 ] ,
table [ index ] , table [ index + 1 ] ) ) ;
buffer [ offset ] = table [ index ] ;
buffer [ offset + 1 ] = table [ index + 1 ] ;
}
}
else
{
/*
This IF should be true because we use in - memory data which
supposed to be correct .
*/
if ( translog_page_validator ( ( uchar * ) buffer , ( uchar * ) data ) )
buffer = NULL ;
}
DBUG_RETURN ( buffer ) ;
}
buffer_no = ( buffer_no + 1 ) % TRANSLOG_BUFFERS_NO ;
curr_buffer = log_descriptor . buffers + buffer_no ;
translog_buffer_lock ( curr_buffer ) ;
translog_buffer_unlock ( buffer_unlock ) ;
buffer_unlock = curr_buffer ;
/* we can't make full circle */
DBUG_ASSERT ( buffer_start ! = buffer_no ) ;
}
}
translog_unlock ( ) ;
}
if ( ( cache_index = LSN_FILE_NO ( log_descriptor . horizon ) - file_no ) <
OPENED_FILES_NUM )
{
@ -2000,6 +2244,7 @@ my_bool translog_init(const char *directory,
DBUG_RETURN ( 1 ) ;
}
log_descriptor . in_buffers_only = LSN_IMPOSSIBLE ;
/* max size of one log size (for new logs creation) */
log_descriptor . log_file_max_size =
log_file_max_size - ( log_file_max_size % TRANSLOG_PAGE_SIZE ) ;
@ -2269,7 +2514,9 @@ my_bool translog_init(const char *directory,
}
/* all LSNs that are on disk are flushed */
log_descriptor . sent_to_file = log_descriptor . flushed = log_descriptor . horizon ;
log_descriptor . sent_to_file =
log_descriptor . flushed = log_descriptor . horizon ;
log_descriptor . in_buffers_only = log_descriptor . bc . buffer - > offset ;
/*
horizon is ( potentially ) address of the next LSN we need decrease
it to signal that all LSNs before it are flushed
@ -2366,57 +2613,6 @@ void translog_destroy()
}
/*
Lock the loghandler
SYNOPSIS
translog_lock ( )
RETURN
0 OK
1 Error
*/
my_bool translog_lock ( )
{
struct st_translog_buffer * current_buffer ;
DBUG_ENTER ( " translog_lock " ) ;
/*
Locking the loghandler mean locking current buffer , but it can change
during locking , so we should check it
*/
for ( ; ; )
{
current_buffer = log_descriptor . bc . buffer ;
if ( translog_buffer_lock ( current_buffer ) )
DBUG_RETURN ( 1 ) ;
if ( log_descriptor . bc . buffer = = current_buffer )
break ;
translog_buffer_unlock ( current_buffer ) ;
}
DBUG_RETURN ( 0 ) ;
}
/*
Unlock the loghandler
SYNOPSIS
translog_unlock ( )
RETURN
0 OK
1 Error
*/
my_bool translog_unlock ( )
{
DBUG_ENTER ( " translog_unlock " ) ;
translog_buffer_unlock ( log_descriptor . bc . buffer ) ;
DBUG_RETURN ( 0 ) ;
}
# define translog_buffer_lock_assert_owner(B) \
@ -2923,6 +3119,7 @@ static my_bool translog_advance_pointer(uint pages, uint16 last_page_data)
log_descriptor . horizon + = min_offset ; /* offset increasing */
}
translog_start_buffer ( new_buffer , & log_descriptor . bc , new_buffer_no ) ;
old_buffer - > next_buffer_offset = new_buffer - > offset ;
if ( translog_buffer_unlock ( old_buffer ) )
DBUG_RETURN ( 1 ) ;
offset - = min_offset ;
@ -3523,7 +3720,7 @@ static my_bool translog_relative_LSN_encode(struct st_translog_parts *parts,
uchar * dst_ptr = compressed_LSNs + ( MAX_NUMBER_OF_LSNS_PER_RECORD *
COMPRESSED_LSN_MAX_STORE_SIZE ) ;
for ( src_ptr = buffer + lsns_len - LSN_STORE_SIZE ;
src_ptr > = buffer ;
src_ptr > = ( uchar * ) buffer ;
src_ptr - = LSN_STORE_SIZE )
{
ref = lsn_korr ( src_ptr ) ;
@ -3536,7 +3733,7 @@ static my_bool translog_relative_LSN_encode(struct st_translog_parts *parts,
dst_ptr ) ;
parts - > record_length - = ( economy = lsns_len - part - > length ) ;
DBUG_PRINT ( " info " , ( " new length of LSNs: %u economy: %d " ,
part - > length , economy ) ) ;
( uint ) part - > length , economy ) ) ;
parts - > total_record_length - = economy ;
part - > str = ( char * ) dst_ptr ;
}
@ -5363,7 +5560,7 @@ static void translog_force_current_buffer_to_finish()
struct st_translog_buffer * new_buffer = ( log_descriptor . buffers +
new_buffer_no ) ;
struct st_translog_buffer * old_buffer = log_descriptor . bc . buffer ;
uchar * data = log_descriptor . bc . ptr - log_descriptor . bc . current_page_fill ;
uchar * data = log_descriptor . bc . ptr - log_descriptor . bc . current_page_fill ;
uint16 left = TRANSLOG_PAGE_SIZE - log_descriptor . bc . current_page_fill ;
uint16 current_page_fill , write_counter , previous_offset ;
DBUG_ENTER ( " translog_force_current_buffer_to_finish " ) ;
@ -5458,13 +5655,14 @@ static void translog_force_current_buffer_to_finish()
if ( left )
{
memcpy ( new_buffer - > buffer , data , current_page_fill ) ;
log_descriptor . bc . ptr + = current_page_fill ;
log_descriptor . bc . ptr + = current_page_fill ;
log_descriptor . bc . buffer - > size = log_descriptor . bc . current_page_fill =
current_page_fill ;
new_buffer - > overlay = old_buffer ;
}
else
translog_new_page_header ( & log_descriptor . horizon , & log_descriptor . bc ) ;
old_buffer - > next_buffer_offset = new_buffer - > offset ;
DBUG_VOID_RETURN ;
}
@ -5527,7 +5725,7 @@ my_bool translog_flush(LSN lsn)
DBUG_RETURN ( 0 ) ;
}
/* send to the file if it is not sent */
translog_get_sent_to_file ( & sent_to_file ) ;
sent_to_file = translog_get_sent_to_file ( ) ;
if ( cmp_translog_addr ( sent_to_file , lsn ) > = 0 )
break ;