@ -36,8 +36,10 @@
# define MERGEBUFF2 31
# define MIN_SORT_MEMORY (4096-MALLOC_OVERHEAD)
# define MYF_RW MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL)
# define DISK_BUFFER_SIZE (IO_SIZE*16 )
# define DISK_BUFFER_SIZE (IO_SIZE*128 )
/* How many keys we can keep in memory */
typedef ulonglong ha_keys ;
/*
Pointers of functions for store and read keys from temp file
@ -47,41 +49,42 @@ extern void print_error(const char *fmt,...);
/* Functions defined in this file */
static ha_rows find_all_keys ( MARIA_SORT_PARAM * info , uint keys ,
static ha_rows find_all_keys ( MARIA_SORT_PARAM * info , ha_keys keys ,
uchar * * sort_keys ,
DYNAMIC_ARRAY * buffpek , int * maxbuffer ,
DYNAMIC_ARRAY * buffpek , u int * maxbuffer ,
IO_CACHE * tempfile ,
IO_CACHE * tempfile_for_exceptions ) ;
static int write_keys ( MARIA_SORT_PARAM * info , uchar * * sort_keys ,
uint count , BUFFPEK * buffpek , IO_CACHE * tempfile ) ;
static int write_keys ( MARIA_SORT_PARAM * info , uchar * * sort_keys ,
ha_keys count , BUFFPEK * buffpek , IO_CACHE * tempfile ) ;
static int write_key ( MARIA_SORT_PARAM * info , uchar * key ,
IO_CACHE * tempfile ) ;
static int write_index ( MARIA_SORT_PARAM * info , uchar * * sort_keys ,
uint count ) ;
static int merge_many_buff ( MARIA_SORT_PARAM * info , uint keys ,
ha_keys count ) ;
static int merge_many_buff ( MARIA_SORT_PARAM * info , ha_keys keys ,
uchar * * sort_keys ,
BUFFPEK * buffpek , int * maxbuffer ,
BUFFPEK * buffpek , u int * maxbuffer ,
IO_CACHE * t_file ) ;
static uin t read_to_buffer ( IO_CACHE * fromfile , BUFFPEK * buffpek ,
uint sort_length ) ;
static int merge_buffers ( MARIA_SORT_PARAM * info , uint keys ,
static my_off_ t read_to_buffer ( IO_CACHE * fromfile , BUFFPEK * buffpek ,
uint sort_length ) ;
static int merge_buffers ( MARIA_SORT_PARAM * info , ha_keys keys ,
IO_CACHE * from_file , IO_CACHE * to_file ,
uchar * * sort_keys , BUFFPEK * lastbuff ,
BUFFPEK * Fb , BUFFPEK * Tb ) ;
static int merge_index ( MARIA_SORT_PARAM * , uint , uchar * * , BUFFPEK * , int ,
static int merge_index ( MARIA_SORT_PARAM * , ha_keys , uchar * * , BUFFPEK * , u int,
IO_CACHE * ) ;
static int flush_maria_ft_buf ( MARIA_SORT_PARAM * info ) ;
static int write_keys_varlen ( MARIA_SORT_PARAM * info , uchar * * sort_keys ,
uint count , BUFFPEK * buffpek ,
static int write_keys_varlen ( MARIA_SORT_PARAM * info , uchar * * sort_keys ,
ha_keys count , BUFFPEK * buffpek ,
IO_CACHE * tempfile ) ;
static uin t read_to_buffer_varlen ( IO_CACHE * fromfile , BUFFPEK * buffpek ,
uint sort_length ) ;
static my_off_ t read_to_buffer_varlen ( IO_CACHE * fromfile , BUFFPEK * buffpek ,
uint sort_length ) ;
static int write_merge_key ( MARIA_SORT_PARAM * info , IO_CACHE * to_file ,
uchar * key , uint sort_length , uint count ) ;
uchar * key , uint sort_length , ha_keys count ) ;
static int write_merge_key_varlen ( MARIA_SORT_PARAM * info ,
IO_CACHE * to_file , uchar * key ,
uint sort_length , uint count ) ;
IO_CACHE * to_file ,
uchar * key , uint sort_length ,
ha_keys count ) ;
static inline int
my_var_write ( MARIA_SORT_PARAM * info , IO_CACHE * to_file , uchar * bufs ) ;
@ -102,11 +105,11 @@ my_var_write(MARIA_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs);
int _ma_create_index_by_sort ( MARIA_SORT_PARAM * info , my_bool no_messages ,
size_t sortbuff_size )
{
int error , maxbuffer , skr ;
size_t memavl , old_memavl ;
uint keys , sort_length ;
int error ;
uint sort_length , maxbuffer ;
size_t memavl , old_memavl ;
DYNAMIC_ARRAY buffpek ;
ha_rows records ;
ha_rows records , keys ;
uchar * * sort_keys ;
IO_CACHE tempfile , tempfile_for_exceptions ;
DBUG_ENTER ( " _ma_create_index_by_sort " ) ;
@ -117,7 +120,7 @@ int _ma_create_index_by_sort(MARIA_SORT_PARAM *info, my_bool no_messages,
if ( info - > keyinfo - > flag & HA_VAR_LENGTH_KEY )
{
info - > write_keys = write_keys_varlen ;
info - > read_to_buffer = read_to_buffer_varlen ;
info - > read_to_buffer = read_to_buffer_varlen ;
info - > write_key = write_merge_key_varlen ;
}
else
@ -140,31 +143,53 @@ int _ma_create_index_by_sort(MARIA_SORT_PARAM *info, my_bool no_messages,
while ( memavl > = MIN_SORT_MEMORY )
{
if ( ( records < UINT_MAX32 ) & &
( ( my_off_t ) ( records + 1 ) *
( sort_length + sizeof ( char * ) ) < = ( my_off_t ) memavl ) )
keys = ( uint ) records + 1 ;
/* Check if we can fit all keys into memory */
if ( ( ( ulonglong ) ( records + 1 ) *
( sort_length + sizeof ( char * ) ) < = memavl ) )
keys = records + 1 ;
else if ( ( info - > sort_info - > param - > testflag &
( T_FORCE_SORT_MEMORY | T_CREATE_MISSING_KEYS ) ) = =
T_FORCE_SORT_MEMORY )
{
/*
Use all of the given sort buffer for key data .
Allocate 1000 buffers at a start for new data . More buffers
will be allocated when needed .
*/
keys = memavl / ( sort_length + sizeof ( char * ) ) ;
maxbuffer = ( uint ) min ( ( ulonglong ) 1000 , ( records / keys ) + 1 ) ;
}
else
{
/*
All keys can ' t fit in memory .
Calculate how many keys + buffers we can keep in memory
*/
uint maxbuffer_org ;
do
{
skr = maxbuffer ;
if ( memavl < sizeof ( BUFFPEK ) * ( uint ) maxbuffer | |
( keys = ( memavl - sizeof ( BUFFPEK ) * ( uint ) maxbuffer ) /
maxbuffer_org = maxbuffer ;
if ( memavl < sizeof ( BUFFPEK ) * maxbuffer | |
( keys = ( memavl - sizeof ( BUFFPEK ) * maxbuffer ) /
( sort_length + sizeof ( char * ) ) ) < = 1 | |
keys < ( uint ) maxbuffer )
keys < maxbuffer )
{
_ma_check_print_error ( info - > sort_info - > param ,
" aria_sort_buffer_size is too small " ) ;
" aria_sort_buffer_size is too small. Current aria_sort_buffer_size: %llu rows: %llu sort_length: %u " ,
( ulonglong ) sortbuff_size , ( ulonglong ) records ,
sort_length ) ;
my_errno = ENOMEM ;
goto err ;
}
}
while ( ( maxbuffer = ( int ) ( records / ( keys - 1 ) + 1 ) ) ! = skr ) ;
while ( ( maxbuffer = ( uint ) ( records / ( keys - 1 ) + 1 ) ) ! = maxbuffer_org ) ;
}
if ( ( sort_keys = ( uchar * * ) my_malloc ( keys * ( sort_length + sizeof ( char * ) ) +
HA_FT_MAXBYTELEN , MYF ( 0 ) ) ) )
{
if ( my_init_dynamic_array ( & buffpek , sizeof ( BUFFPEK ) , maxbuffer ,
maxbuffer / 2 , MYF ( 0 ) ) )
min ( m axbuffer / 2 , 1000 ) , MYF ( 0 ) ) )
{
my_free ( sort_keys ) ;
sort_keys = 0 ;
@ -178,14 +203,20 @@ int _ma_create_index_by_sort(MARIA_SORT_PARAM *info, my_bool no_messages,
}
if ( memavl < MIN_SORT_MEMORY )
{
_ma_check_print_error ( info - > sort_info - > param , " Aria sort buffer "
" too small " ) ; /* purecov: tested */
goto err ; /* purecov: tested */
/* purecov: begin inspected */
_ma_check_print_error ( info - > sort_info - > param ,
" aria_sort_buffer_size is too small. Current aria_sort_buffer_size: %llu rows: %llu sort_length: %u " ,
( ulonglong ) sortbuff_size , ( ulonglong ) records , sort_length ) ;
my_errno = ENOMEM ;
goto err ;
/* purecov: end inspected */
}
( * info - > lock_in_memory ) ( info - > sort_info - > param ) ; /* Everything is allocated */
if ( ! no_messages )
printf ( " - Searching for keys, allocating buffer for %d keys \n " , keys ) ;
my_fprintf ( stdout ,
" - Searching for keys, allocating buffer for %llu keys \n " ,
( ulonglong ) keys ) ;
if ( ( records = find_all_keys ( info , keys , sort_keys , & buffpek , & maxbuffer ,
& tempfile , & tempfile_for_exceptions ) )
@ -197,8 +228,8 @@ int _ma_create_index_by_sort(MARIA_SORT_PARAM *info, my_bool no_messages,
if ( maxbuffer = = 0 )
{
if ( ! no_messages )
printf ( " - Dumping %lu keys \n " , ( ulong ) records ) ;
if ( write_index ( info , sort_keys , ( uint ) records ) )
my_f printf( stdout , " - Dumping %l lu keys \n " , ( ulong long ) records ) ;
if ( write_index ( info , sort_keys , ( ha_keys ) records ) )
goto err ; /* purecov: inspected */
}
else
@ -207,7 +238,8 @@ int _ma_create_index_by_sort(MARIA_SORT_PARAM *info, my_bool no_messages,
if ( maxbuffer > = MERGEBUFF2 )
{
if ( ! no_messages )
printf ( " - Merging %lu keys \n " , ( ulong ) records ) ; /* purecov: tested */
my_fprintf ( stdout , " - Merging %llu keys \n " ,
( ulonglong ) records ) ; /* purecov: tested */
if ( merge_many_buff ( info , keys , sort_keys ,
dynamic_element ( & buffpek , 0 , BUFFPEK * ) , & maxbuffer , & tempfile ) )
goto err ; /* purecov: inspected */
@ -266,13 +298,13 @@ err:
/* Search after all keys and place them in a temp. file */
static ha_rows find_all_keys ( MARIA_SORT_PARAM * info , uint keys ,
static ha_rows find_all_keys ( MARIA_SORT_PARAM * info , ha_rows keys ,
uchar * * sort_keys , DYNAMIC_ARRAY * buffpek ,
int * maxbuffer , IO_CACHE * tempfile ,
u int * maxbuffer , IO_CACHE * tempfile ,
IO_CACHE * tempfile_for_exceptions )
{
int error ;
uint idx ;
ha_rows idx ;
DBUG_ENTER ( " find_all_keys " ) ;
idx = error = 0 ;
@ -328,9 +360,11 @@ pthread_handler_t _ma_thr_find_all_keys(void *arg)
{
MARIA_SORT_PARAM * sort_param = ( MARIA_SORT_PARAM * ) arg ;
int error ;
size_t memavl , old_memavl ;
size_t memavl , old_memavl ;
longlong sortbuff_size ;
ha_keys keys , idx ;
uint sort_length ;
ulong idx , maxbuffer , keys ;
uint maxbuffer ;
uchar * * sort_keys = 0 ;
LINT_INIT ( keys ) ;
@ -364,8 +398,9 @@ pthread_handler_t _ma_thr_find_all_keys(void *arg)
bzero ( ( char * ) & sort_param - > buffpek , sizeof ( sort_param - > buffpek ) ) ;
bzero ( ( char * ) & sort_param - > unique , sizeof ( sort_param - > unique ) ) ;
memavl = max ( sort_param - > sortbuff_size , MIN_SORT_MEMORY ) ;
idx = ( uint ) sort_param - > sort_info - > max_records ;
sortbuff_size = sort_param - > sortbuff_size ;
memavl = max ( sortbuff_size , MIN_SORT_MEMORY ) ;
idx = ( ha_keys ) sort_param - > sort_info - > max_records ;
sort_length = sort_param - > key_length ;
maxbuffer = 1 ;
@ -373,23 +408,36 @@ pthread_handler_t _ma_thr_find_all_keys(void *arg)
{
if ( ( my_off_t ) ( idx + 1 ) * ( sort_length + sizeof ( char * ) ) < = ( my_off_t ) memavl )
keys = idx + 1 ;
else if ( ( sort_param - > sort_info - > param - > testflag &
( T_FORCE_SORT_MEMORY | T_CREATE_MISSING_KEYS ) ) = =
T_FORCE_SORT_MEMORY )
{
/*
Use all of the given sort buffer for key data .
Allocate 1000 buffers at a start for new data . More buffers
will be allocated when needed .
*/
keys = memavl / ( sort_length + sizeof ( char * ) ) ;
maxbuffer = ( uint ) min ( ( ulonglong ) 1000 , ( idx / keys ) + 1 ) ;
}
else
{
ulong skr ;
uint maxbuffer_org ;
do
{
skr = maxbuffer ;
maxbuffer_org = maxbuffer ;
if ( memavl < sizeof ( BUFFPEK ) * maxbuffer | |
( keys = ( memavl - sizeof ( BUFFPEK ) * maxbuffer ) /
( sort_length + sizeof ( char * ) ) ) < = 1 | |
keys < maxbuffer )
{
_ma_check_print_error ( sort_param - > sort_info - > param ,
" aria_sort_buffer_size is too small " ) ;
" aria_sort_buffer_size is too small. Current aria_sort_buffer_size: %llu rows: %llu sort_length: %u " ,
sortbuff_size , ( ulonglong ) idx , sort_length ) ;
goto err ;
}
}
while ( ( maxbuffer = ( int ) ( idx / ( keys - 1 ) + 1 ) ) ! = skr ) ;
while ( ( maxbuffer = ( u int) ( idx / ( keys - 1 ) + 1 ) ) ! = maxbuffer_org ) ;
}
if ( ( sort_keys = ( uchar * * )
my_malloc ( keys * ( sort_length + sizeof ( char * ) ) +
@ -397,7 +445,7 @@ pthread_handler_t _ma_thr_find_all_keys(void *arg)
HA_FT_MAXBYTELEN : 0 ) , MYF ( 0 ) ) ) )
{
if ( my_init_dynamic_array ( & sort_param - > buffpek , sizeof ( BUFFPEK ) ,
maxbuffer , maxbuffer / 2 , MYF ( 0 ) ) )
maxbuffer , min ( m axbuffer / 2 , 1000 ) , MYF ( 0 ) ) )
{
my_free ( sort_keys ) ;
sort_keys = ( uchar * * ) NULL ; /* for err: label */
@ -412,14 +460,19 @@ pthread_handler_t _ma_thr_find_all_keys(void *arg)
}
if ( memavl < MIN_SORT_MEMORY )
{
/* purecov: begin inspected */
_ma_check_print_error ( sort_param - > sort_info - > param ,
" Aria sort buffer too small " ) ;
goto err ; /* purecov: tested */
" aria_sort_buffer_size is too small. Current aria_sort_buffer_size: %llu rows: %llu sort_length: %u " ,
sortbuff_size , ( ulonglong ) idx , sort_length ) ;
my_errno = ENOMEM ;
goto err ;
/* purecov: end inspected */
}
if ( sort_param - > sort_info - > param - > testflag & T_VERBOSE )
printf ( " Key %d - Allocating buffer for %lu keys \n " ,
sort_param - > key + 1 , ( ulong ) keys ) ;
my_fprintf ( stdout ,
" Key %d - Allocating buffer for %llu keys \n " ,
sort_param - > key + 1 , ( ulonglong ) keys ) ;
sort_param - > sort_keys = sort_keys ;
idx = error = 0 ;
@ -431,7 +484,7 @@ pthread_handler_t _ma_thr_find_all_keys(void *arg)
{
if ( sort_param - > real_key_length > sort_param - > key_length )
{
if ( write_key ( sort_param , sort_keys [ idx ] ,
if ( write_key ( sort_param , sort_keys [ idx ] ,
& sort_param - > tempfile_for_exceptions ) )
goto err ;
continue ;
@ -449,7 +502,7 @@ pthread_handler_t _ma_thr_find_all_keys(void *arg)
( size_t ) sort_param - > key_length ) ;
idx = 1 ;
}
sort_keys [ idx ] = sort_keys [ idx - 1 ] + sort_param - > key_length ;
sort_keys [ idx ] = sort_keys [ idx - 1 ] + sort_param - > key_length ;
}
if ( error > 0 )
goto err ;
@ -465,14 +518,13 @@ pthread_handler_t _ma_thr_find_all_keys(void *arg)
else
sort_param - > keys = idx ;
sort_param - > sort_keys_length = keys ;
goto ok ;
err :
DBUG_PRINT ( " error " , ( " got some error " ) ) ;
sort_param - > sort_info - > got_error = 1 ; /* no need to protect with a mutex */
my_free ( sort_keys ) ;
sort_param - > sort_keys = 0 ;
sort_param - > sort_keys = 0 ;
delete_dynamic ( & sort_param - > buffpek ) ;
close_cached_file ( & sort_param - > tempfile ) ;
close_cached_file ( & sort_param - > tempfile_for_exceptions ) ;
@ -483,8 +535,12 @@ ok:
Detach from the share if the writer is involved . Avoid others to
be blocked . This includes a flush of the write buffer . This will
also indicate EOF to the readers .
That means that a writer always gets here first and readers -
only when they see EOF . But if a reader finishes prematurely
because of an error it may reach this earlier - don ' t allow it
to detach the writer thread .
*/
if ( sort_param - > sort_info - > info - > rec_cache . share )
if ( sort_param - > master & & sort_param - > sort_info - > info - > rec_cache . share )
remove_io_thread ( & sort_param - > sort_info - > info - > rec_cache ) ;
/* Readers detach from the share if any. Avoid others to be blocked. */
@ -518,7 +574,7 @@ int _ma_thr_write_keys(MARIA_SORT_PARAM *sort_param)
for ( i = 0 , sinfo = sort_param ;
i < sort_info - > total_keys ;
i + + , rec_per_key_part + = sinfo - > keyinfo - > keysegs , sinfo + + )
i + + , sinfo + + )
{
if ( ! sinfo - > sort_keys )
{
@ -534,19 +590,15 @@ int _ma_thr_write_keys(MARIA_SORT_PARAM *sort_param)
{
if ( param - > testflag & T_VERBOSE )
{
printf ( " Key %d - Dumping %u keys \n " , sinfo - > key + 1 , sinfo - > keys ) ;
my_fprintf ( stdout ,
" Key %d - Dumping %llu keys \n " , sinfo - > key + 1 ,
( ulonglong ) sinfo - > keys ) ;
fflush ( stdout ) ;
}
if ( write_index ( sinfo , sinfo - > sort_keys , sinfo - > keys ) | |
flush_maria_ft_buf ( sinfo ) | | _ma_flush_pending_blocks ( sinfo ) )
got_error = 1 ;
}
if ( ! got_error & & param - > testflag & T_STATISTICS )
maria_update_key_parts ( sinfo - > keyinfo , rec_per_key_part , sinfo - > unique ,
param - > stats_method = =
MI_STATS_METHOD_IGNORE_NULLS ?
sinfo - > notnull : NULL ,
( ulonglong ) share - > state . state . records ) ;
}
my_free ( sinfo - > sort_keys ) ;
my_free ( sinfo - > rec_buff ) ;
@ -559,6 +611,7 @@ int _ma_thr_write_keys(MARIA_SORT_PARAM *sort_param)
delete_dynamic ( & sinfo - > buffpek ) ,
close_cached_file ( & sinfo - > tempfile ) ,
close_cached_file ( & sinfo - > tempfile_for_exceptions ) ,
rec_per_key_part + = sinfo - > keyinfo - > keysegs ,
sinfo + + )
{
if ( got_error )
@ -597,10 +650,12 @@ int _ma_thr_write_keys(MARIA_SORT_PARAM *sort_param)
if ( maxbuffer > = MERGEBUFF2 )
{
if ( param - > testflag & T_VERBOSE )
printf ( " Key %d - Merging %u keys \n " , sinfo - > key + 1 , sinfo - > keys ) ;
if ( merge_many_buff ( sinfo , keys , ( uchar * * ) mergebuf ,
my_fprintf ( stdout ,
" Key %d - Merging %llu keys \n " ,
sinfo - > key + 1 , ( ulonglong ) sinfo - > keys ) ;
if ( merge_many_buff ( sinfo , keys , ( uchar * * ) mergebuf ,
dynamic_element ( & sinfo - > buffpek , 0 , BUFFPEK * ) ,
( int * ) & maxbuffer , & sinfo - > tempfile ) )
& maxbuffer , & sinfo - > tempfile ) )
{
got_error = 1 ;
continue ;
@ -660,6 +715,13 @@ int _ma_thr_write_keys(MARIA_SORT_PARAM *sort_param)
}
}
}
if ( ! got_error & & ( param - > testflag & T_STATISTICS ) )
maria_update_key_parts ( sinfo - > keyinfo , rec_per_key_part , sinfo - > unique ,
param - > stats_method = =
MI_STATS_METHOD_IGNORE_NULLS ?
sinfo - > notnull : NULL ,
( ulonglong ) share - > state . state . records ) ;
}
my_free ( mergebuf ) ;
DBUG_RETURN ( got_error ) ;
@ -669,12 +731,15 @@ int _ma_thr_write_keys(MARIA_SORT_PARAM *sort_param)
/* Write all keys in memory to file for later merge */
static int write_keys ( MARIA_SORT_PARAM * info , register uchar * * sort_keys ,
uint count , BUFFPEK * buffpek , IO_CACHE * tempfile )
ha_keys count , BUFFPEK * buffpek , IO_CACHE * tempfile )
{
uchar * * end ;
uint sort_length = info - > key_length ;
DBUG_ENTER ( " write_keys " ) ;
if ( ! buffpek )
DBUG_RETURN ( 1 ) ; /* Out of memory */
my_qsort2 ( ( uchar * ) sort_keys , count , sizeof ( uchar * ) , ( qsort2_cmp ) info - > key_cmp ,
info ) ;
if ( ! my_b_inited ( tempfile ) & &
@ -687,7 +752,7 @@ static int write_keys(MARIA_SORT_PARAM *info, register uchar **sort_keys,
for ( end = sort_keys + count ; sort_keys ! = end ; sort_keys + + )
{
if ( my_b_write ( tempfile , * sort_keys , ( uint ) sort_length ) )
if ( my_b_write ( tempfile , * sort_keys , sort_length ) )
DBUG_RETURN ( 1 ) ; /* purecov: inspected */
}
DBUG_RETURN ( 0 ) ;
@ -710,14 +775,17 @@ my_var_write(MARIA_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs)
static int write_keys_varlen ( MARIA_SORT_PARAM * info ,
register uchar * * sort_keys ,
uint count , BUFFPEK * buffpek ,
IO_CACHE * tempfile )
register uchar * * sort_keys ,
ha_keys count , BUFFPEK * buffpek ,
IO_CACHE * tempfile )
{
uchar * * end ;
int err ;
DBUG_ENTER ( " write_keys_varlen " ) ;
if ( ! buffpek )
DBUG_RETURN ( 1 ) ; /* Out of memory */
my_qsort2 ( ( uchar * ) sort_keys , count , sizeof ( uchar * ) , ( qsort2_cmp ) info - > key_cmp ,
info ) ;
if ( ! my_b_inited ( tempfile ) & &
@ -756,9 +824,8 @@ static int write_key(MARIA_SORT_PARAM *info, uchar *key,
/* Write index */
static int write_index ( MARIA_SORT_PARAM * info ,
register uchar * * sort_keys ,
register uint count )
static int write_index ( MARIA_SORT_PARAM * info , register uchar * * sort_keys ,
register ha_keys count )
{
DBUG_ENTER ( " write_index " ) ;
@ -777,11 +844,11 @@ static int write_index(MARIA_SORT_PARAM *info,
/* Merge buffers to make < MERGEBUFF2 buffers */
static int merge_many_buff ( MARIA_SORT_PARAM * info , uint keys ,
uchar * * sort_keys , BUFFPEK * buffpek ,
int * maxbuffer , IO_CACHE * t_file )
static int merge_many_buff ( MARIA_SORT_PARAM * info , ha_keys keys ,
uchar * * sort_keys , BUFFPEK * buffpek ,
u int * maxbuffer , IO_CACHE * t_file )
{
int tmp , merges , max_merges ;
u int tmp , merges , max_merges ;
IO_CACHE t_file2 , * from_file , * to_file , * temp ;
BUFFPEK * lastbuff ;
DBUG_ENTER ( " merge_many_buff " ) ;
@ -807,11 +874,11 @@ static int merge_many_buff(MARIA_SORT_PARAM *info, uint keys,
from_file = t_file ; to_file = & t_file2 ;
while ( * maxbuffer > = MERGEBUFF2 )
{
int i ;
u int i ;
reinit_io_cache ( from_file , READ_CACHE , 0L , 0 , 0 ) ;
reinit_io_cache ( to_file , WRITE_CACHE , 0L , 0 , 0 ) ;
lastbuff = buffpek ;
for ( i = 0 ; i < = * maxbuffer - MERGEBUFF * 3 / 2 ; i + = MERGEBUFF )
for ( i = 0 ; i + MERGEBUFF * 3 / 2 < = * maxbuffer ; i + = MERGEBUFF )
{
if ( merge_buffers ( info , keys , from_file , to_file , sort_keys , lastbuff + + ,
buffpek + i , buffpek + i + MERGEBUFF - 1 ) )
@ -825,14 +892,19 @@ static int merge_many_buff(MARIA_SORT_PARAM *info, uint keys,
if ( flush_io_cache ( to_file ) )
break ; /* purecov: inspected */
temp = from_file ; from_file = to_file ; to_file = temp ;
* maxbuffer = ( int ) ( lastbuff - buffpek ) - 1 ;
* maxbuffer = ( u int) ( lastbuff - buffpek ) - 1 ;
if ( info - > sort_info - > param - > max_stage ! = 1 ) /* If not parallel */
_ma_report_progress ( info - > sort_info - > param , merges + + , max_merges ) ;
}
cleanup :
close_cached_file ( to_file ) ; /* This holds old result */
if ( to_file = = t_file )
{
DBUG_ASSERT ( t_file2 . type = = WRITE_CACHE ) ;
* t_file = t_file2 ; /* Copy result file */
t_file - > current_pos = & t_file - > write_pos ;
t_file - > current_end = & t_file - > write_end ;
}
DBUG_RETURN ( * maxbuffer > = MERGEBUFF2 ) ; /* Return 1 if interrupted */
} /* merge_many_buff */
@ -851,33 +923,35 @@ cleanup:
- 1 Error
*/
static uin t read_to_buffer ( IO_CACHE * fromfile , BUFFPEK * buffpek ,
uint sort_length )
static my_off_ t read_to_buffer ( IO_CACHE * fromfile , BUFFPEK * buffpek ,
uint sort_length )
{
register uint count ;
uin t length ;
register ha_keys count ;
my_off_ t length ;
if ( ( count = ( uint ) min ( ( ha_rows ) buffpek - > max_keys , buffpek - > count ) ) )
if ( ( count = ( ha_keys ) min ( ( ha_rows ) buffpek - > max_keys , buffpek - > count ) ) )
{
if ( mysql_file_pread ( fromfile - > file , buffpek - > base ,
( length = sort_length * count ) , buffpek - > file_pos , MYF_RW ) )
return ( ( uint ) - 1 ) ; /* purecov: inspected */
if ( mysql_file_pread ( fromfile - > file , ( uchar * ) buffpek - > base ,
( length = sort_length * count ) ,
buffpek - > file_pos , MYF_RW ) )
return ( HA_OFFSET_ERROR ) ; /* purecov: inspected */
buffpek - > key = buffpek - > base ;
buffpek - > file_pos + = length ; /* New filepos */
buffpek - > count - = count ;
buffpek - > mem_count = count ;
}
return ( count * sort_length ) ;
return ( ( ( my_off_t ) count ) * sort_length ) ;
} /* read_to_buffer */
static uint read_to_buffer_varlen ( IO_CACHE * fromfile , BUFFPEK * buffpek ,
uint sort_length )
static my_off_t read_to_buffer_varlen ( IO_CACHE * fromfile , BUFFPEK * buffpek ,
uint sort_length )
{
register uint count ;
register ha_keys count ;
uint idx ;
uchar * buffp ;
if ( ( count = ( uint ) min ( ( ha_rows ) buffpek - > max_keys , buffpek - > count ) ) )
if ( ( count = ( ha_keys ) min ( ( ha_rows ) buffpek - > max_keys , buffpek - > count ) ) )
{
buffp = buffpek - > base ;
@ -886,7 +960,7 @@ static uint read_to_buffer_varlen(IO_CACHE *fromfile, BUFFPEK *buffpek,
uint16 length_of_key ;
if ( mysql_file_pread ( fromfile - > file , ( uchar * ) & length_of_key , sizeof ( length_of_key ) ,
buffpek - > file_pos , MYF_RW ) )
return ( ( uint ) - 1 ) ;
return ( HA_OFFSET_ERROR ) ;
buffpek - > file_pos + = sizeof ( length_of_key ) ;
if ( mysql_file_pread ( fromfile - > file , buffp , length_of_key ,
buffpek - > file_pos , MYF_RW ) )
@ -898,15 +972,15 @@ static uint read_to_buffer_varlen(IO_CACHE *fromfile, BUFFPEK *buffpek,
buffpek - > count - = count ;
buffpek - > mem_count = count ;
}
return ( count * sort_length ) ;
return ( ( ( my_off_t ) count ) * sort_length ) ;
} /* read_to_buffer_varlen */
static int write_merge_key_varlen ( MARIA_SORT_PARAM * info ,
IO_CACHE * to_file , uchar * key ,
uint sort_length , uint count )
uint sort_length , ha_keys count )
{
uint idx ;
ha_keys idx ;
uchar * bufs = key ;
for ( idx = 1 ; idx < = count ; idx + + )
@ -921,34 +995,40 @@ static int write_merge_key_varlen(MARIA_SORT_PARAM *info,
static int write_merge_key ( MARIA_SORT_PARAM * info __attribute__ ( ( unused ) ) ,
IO_CACHE * to_file , uchar * key ,
uint sort_length , uint count )
IO_CACHE * to_file , uchar * key ,
uint sort_length , ha_keys count )
{
return my_b_write ( to_file , key , ( size_t ) sort_length * count ) ;
return my_b_write ( to_file , key , ( ( size_t ) sort_length ) * count ) ;
}
/*
Merge buffers to one buffer
If to_file = = 0 then use info - > key_write
Return :
0 ok
1 error
*/
static int
merge_buffers ( MARIA_SORT_PARAM * info , uint keys , IO_CACHE * from_file ,
merge_buffers ( MARIA_SORT_PARAM * info , ha_keys keys , IO_CACHE * from_file ,
IO_CACHE * to_file , uchar * * sort_keys , BUFFPEK * lastbuff ,
BUFFPEK * Fb , BUFFPEK * Tb )
{
int error ;
uint sort_length , maxcount ;
int error = 1 ;
uint sort_length ;
ha_keys maxcount ;
ha_rows count ;
my_off_t UNINIT_VAR ( to_start_filepos ) ;
my_off_t UNINIT_VAR ( to_start_filepos ) , read_length ;
uchar * strpos ;
BUFFPEK * buffpek , * * refpek ;
QUEUE queue ;
DBUG_ENTER ( " merge_buffers " ) ;
count = error = 0 ;
maxcount = keys / ( ( uint ) ( Tb - Fb ) + 1 ) ;
count = 0 ;
maxcount = keys / ( ( uint ) ( Tb - Fb ) + 1 ) ;
DBUG_ASSERT ( maxcount > 0 ) ;
LINT_INIT ( to_start_filepos ) ;
if ( to_file )
to_start_filepos = my_b_tell ( to_file ) ;
strpos = ( uchar * ) sort_keys ;
@ -963,10 +1043,10 @@ merge_buffers(MARIA_SORT_PARAM *info, uint keys, IO_CACHE *from_file,
{
count + = buffpek - > count ;
buffpek - > base = strpos ;
buffpek - > max_keys = maxcount ;
strpos + = ( uint ) ( error = ( int ) info - > read_to_buffer ( from_file , buffpek ,
sort_length ) ) ;
if ( error = = - 1 )
buffpek - > max_keys = maxcount ;
strpos + = ( read_length = info - > read_to_buffer ( from_file , buffpek ,
sort_length ) ) ;
if ( read_length = = HA_OFFSET_ERROR )
goto err ; /* purecov: inspected */
queue_insert ( & queue , ( uchar * ) buffpek ) ;
}
@ -980,27 +1060,20 @@ merge_buffers(MARIA_SORT_PARAM *info, uint keys, IO_CACHE *from_file,
{
if ( info - > write_key ( info , to_file , buffpek - > key ,
( uint ) sort_length , 1 ) )
{
error = 1 ; goto err ; /* purecov: inspected */
}
goto err ; /* purecov: inspected */
}
else
{
if ( ( * info - > key_write ) ( info , ( void * ) buffpek - > key ) )
{
error = 1 ; goto err ; /* purecov: inspected */
}
goto err ; /* purecov: inspected */
}
buffpek - > key + = sort_length ;
if ( ! - - buffpek - > mem_count )
{
/* It's enough to check for killedptr before a slow operation */
if ( _ma_killed_ptr ( info - > sort_info - > param ) )
{
error = 1 ;
goto err ;
}
if ( ! ( error = ( int ) info - > read_to_buffer ( from_file , buffpek , sort_length ) ) )
if ( ! ( read_length = info - > read_to_buffer ( from_file , buffpek , sort_length ) ) )
{
uchar * base = buffpek - > base ;
uint max_keys = buffpek - > max_keys ;
@ -1027,9 +1100,9 @@ merge_buffers(MARIA_SORT_PARAM *info, uint keys, IO_CACHE *from_file,
}
break ; /* One buffer have been removed */
}
else if ( read_length = = HA_OFFSET_ERROR )
goto err ; /* purecov: inspected */
}
else if ( error = = - 1 )
goto err ; /* purecov: inspected */
queue_replace_top ( & queue ) ; /* Top element has been replaced */
}
}
@ -1061,8 +1134,9 @@ merge_buffers(MARIA_SORT_PARAM *info, uint keys, IO_CACHE *from_file,
}
}
}
while ( ( error = ( int ) info - > read_to_buffer ( from_file , buffpek , sort_length ) ) ! =
- 1 & & error ! = 0 ) ;
while ( ( read_length = info - > read_to_buffer ( from_file , buffpek , sort_length ) ) ! = HA_OFFSET_ERROR & & read_length ! = 0 ) ;
if ( read_length = = 0 )
error = 0 ;
lastbuff - > count = count ;
if ( to_file )
@ -1076,8 +1150,8 @@ err:
/* Do a merge to output-file (save only positions) */
static int
merge_index ( MARIA_SORT_PARAM * info , uint keys , uchar * * sort_keys ,
BUFFPEK * buffpek , int maxbuffer , IO_CACHE * tempfile )
merge_index ( MARIA_SORT_PARAM * info , ha_keys keys , uchar * * sort_keys ,
BUFFPEK * buffpek , u int maxbuffer , IO_CACHE * tempfile )
{
DBUG_ENTER ( " merge_index " ) ;
if ( merge_buffers ( info , keys , tempfile , ( IO_CACHE * ) 0 , sort_keys , buffpek , buffpek ,