Browse Source

MDEV-36234: Add innodb_linux_aio

This controls which linux implementation to use for
innodb_use_native_aio=ON.

innodb_linux_aio=auto is equivalent to innodb_linux_aio=io_uring when
it is available, and falling back to innodb_linux_aio=aio when not.

Debian packaging is no longer aio exclusive or uring, so
for those older Debian or Ubuntu releases, its a remove_uring directive.
For more recent releases, add mandatory liburing for consistent packaging.

WITH_LIBAIO is now an independent option from WITH_URING.

LINUX_NATIVE_AIO preprocessor constant is renamed to HAVE_LIBAIO,
analogous to existing HAVE_URING.

tpool::is_aio_supported(): A common feature check.

is_linux_native_aio_supported(): Remove. This had originally been added in
mysql/mysql-server@0da310b69db3a39ba2e6d63ff62ef3c027cd63ff in 2012
to fix an issue where io_submit() on CentOS 5.5 would return EINVAL
for a /tmp/#sql*.ibd file associated with CREATE TEMPORARY TABLE.
But, starting with commit 2e814d4702 InnoDB
temporary tables will be written to innodb_temp_data_file_path.
The 2012 commit said that the error could occur on "old kernels".
Any GNU/Linux distribution that we currently support should be based
on a newer Linux kernel; for example, Red Hat Enterprise Linux 7
was released in 2014.

tpool::create_linux_aio(): Wraps the Linux implementations:
create_libaio() and create_liburing(), each defined in separate
compilation units (aio_linux.cc, aio_libaio.cc, aio_liburing.cc).

The CMake definitions are simplified using target_sources() and
target_compile_definitions(), all available since CMake 2.8.12.
With this change, there is no need to include ${CMAKE_SOURCE_DIR}/tpool
or add TPOOL_DEFINES flags anymore, target_link_libraries(lib tpool)
does all that.

This is joint work with Daniel Black and Vladislav Vaintroub.
pull/3976/head
Marko Mäkelä 4 months ago
parent
commit
a87bb96ecb
  1. 3
      cmake/plugin.cmake
  2. 13
      debian/autobake-deb.sh
  3. 5
      debian/rules
  4. 2
      extra/mariabackup/CMakeLists.txt
  5. 40
      extra/mariabackup/xtrabackup.cc
  6. 1
      libmysqld/CMakeLists.txt
  7. 2
      mysql-test/mariadb-test-run.pl
  8. 21
      mysql-test/suite/sys_vars/r/innodb_linux_aio_basic.result
  9. 1
      mysql-test/suite/sys_vars/r/sysvars_innodb.result
  10. 23
      mysql-test/suite/sys_vars/t/innodb_linux_aio_basic.test
  11. 2
      mysql-test/suite/sys_vars/t/innodb_read_io_threads_basic.test
  12. 1
      mysql-test/suite/sys_vars/t/sysvars_innodb.test
  13. 1
      sql/CMakeLists.txt
  14. 9
      storage/innobase/CMakeLists.txt
  15. 39
      storage/innobase/handler/ha_innodb.cc
  16. 13
      storage/innobase/include/fil0fil.h
  17. 6
      storage/innobase/include/srv0srv.h
  18. 184
      storage/innobase/os/os0file.cc
  19. 4
      storage/innobase/srv/srv0srv.cc
  20. 13
      storage/innobase/srv/srv0start.cc
  21. 61
      tpool/CMakeLists.txt
  22. 193
      tpool/aio_libaio.cc
  23. 18
      tpool/aio_liburing.cc
  24. 211
      tpool/aio_linux.cc
  25. 1
      tpool/aio_simulated.cc
  26. 1
      tpool/aio_win.cc
  27. 65
      tpool/tpool.h
  28. 25
      tpool/tpool_generic.cc
  29. 7
      tpool/tpool_win.cc

3
cmake/plugin.cmake

@ -161,6 +161,9 @@ MACRO(MYSQL_ADD_PLUGIN)
PROPERTIES COMPILE_DEFINITIONS "EMBEDDED_LIBRARY${version_string}")
ENDIF()
ADD_DEPENDENCIES(${target}_embedded GenError ${ARG_DEPENDS})
IF(ARG_LINK_LIBRARIES)
TARGET_LINK_LIBRARIES (${target}_embedded ${ARG_LINK_LIBRARIES})
ENDIF()
ENDIF()
ENDIF()

13
debian/autobake-deb.sh

@ -64,11 +64,10 @@ add_lsb_base_depends()
sed -e 's#lsof #lsb-base (>= 3.0-10),\n lsof #' -i debian/control
}
replace_uring_with_aio()
remove_uring()
{
sed 's/liburing-dev/libaio-dev/g' -i debian/control
sed -e '/-DIGNORE_AIO_CHECK=ON/d' \
-e '/-DWITH_URING=ON/d' -i debian/rules
sed -e '/liburing-dev/d' -i debian/control
sed -e '/-DWITH_URING=ON/d' -i debian/rules
}
disable_libfmt()
@ -116,7 +115,7 @@ in
# Debian
"buster")
disable_libfmt
replace_uring_with_aio
remove_uring
;&
"bullseye")
add_lsb_base_depends
@ -127,7 +126,7 @@ in
# so no removal is necessary.
if [[ ! "$architecture" =~ amd64|arm64|armel|armhf|i386|mips64el|mipsel|ppc64el|s390x ]]
then
replace_uring_with_aio
remove_uring
fi
;&
"trixie"|"sid")
@ -136,8 +135,8 @@ in
;;
# Ubuntu
"focal")
replace_uring_with_aio
disable_libfmt
remove_uring
;&
"jammy"|"kinetic")
add_lsb_base_depends

5
debian/rules

@ -87,9 +87,6 @@ endif
# quality standards in Debian. Also building it requires an extra 4 GB of disk
# space which makes native Debian builds fail as the total disk space needed
# for MariaDB becomes over 10 GB. Only build CS via autobake-deb.sh.
#
# Note: Don't use '-DWITH_URING=ON' as some Buildbot builders are missing it
# and would fail permanently.
PATH=$${MYSQL_BUILD_PATH:-"/usr/lib/ccache:/usr/local/bin:/usr/bin:/bin"} \
dh_auto_configure --builddirectory=$(BUILDDIR) -- \
-DCMAKE_BUILD_TYPE=RelWithDebInfo \
@ -103,6 +100,8 @@ endif
-DPLUGIN_AWS_KEY_MANAGEMENT=NO \
-DPLUGIN_COLUMNSTORE=NO \
-DIGNORE_AIO_CHECK=ON \
-DWITH_URING=ON \
-DWITH_LIBAIO=ON \
-DDEB=$(DEB_VENDOR)
# This is needed, otherwise 'make test' will run before binaries have been built

2
extra/mariabackup/CMakeLists.txt

@ -107,6 +107,8 @@ MYSQL_ADD_EXECUTABLE(mbstream
TARGET_LINK_LIBRARIES(mbstream
mysys
)
TARGET_INCLUDE_DIRECTORIES(mbstream PRIVATE ${PROJECT_SOURCE_DIR}/tpool)
ADD_DEPENDENCIES(mbstream GenError)
IF(MSVC)

40
extra/mariabackup/xtrabackup.cc

@ -378,6 +378,10 @@ extern const char *innodb_checksum_algorithm_names[];
extern TYPELIB innodb_checksum_algorithm_typelib;
extern const char *innodb_flush_method_names[];
extern TYPELIB innodb_flush_method_typelib;
#ifdef __linux__
extern const char *innodb_linux_aio_names[];
extern TYPELIB innodb_linux_aio_typelib;
#endif
static const char *binlog_info_values[] = {"off", "lockless", "on", "auto",
NullS};
@ -1334,6 +1338,9 @@ enum options_xtrabackup
OPT_INNODB_READ_IO_THREADS,
OPT_INNODB_WRITE_IO_THREADS,
OPT_INNODB_USE_NATIVE_AIO,
#ifdef __linux__
OPT_INNODB_LINUX_AIO,
#endif
OPT_INNODB_PAGE_SIZE,
OPT_INNODB_BUFFER_POOL_FILENAME,
OPT_INNODB_LOCK_WAIT_TIMEOUT,
@ -1934,6 +1941,14 @@ struct my_option xb_server_options[] =
(G_PTR*) &srv_use_native_aio,
(G_PTR*) &srv_use_native_aio, 0, GET_BOOL, NO_ARG,
TRUE, 0, 0, 0, 0, 0},
#ifdef __linux__
{"innodb_linux_aio", OPT_INNODB_LINUX_AIO,
"Which linux AIO implementation to use, auto (io_uring, failing to aio) or explicit",
(G_PTR*) &srv_linux_aio_method,
(G_PTR*) &srv_linux_aio_method,
&innodb_linux_aio_typelib, GET_ENUM, REQUIRED_ARG,
SRV_LINUX_AIO_AUTO, 0, 0, 0, 0, 0},
#endif
{"innodb_page_size", OPT_INNODB_PAGE_SIZE,
"The universal page size of the database.",
(G_PTR*) &innobase_page_size, (G_PTR*) &innobase_page_size, 0,
@ -2529,26 +2544,7 @@ static bool innodb_init_param()
ut_ad(DATA_MYSQL_BINARY_CHARSET_COLL == my_charset_bin.number);
#ifdef _WIN32
srv_use_native_aio = TRUE;
#elif defined(LINUX_NATIVE_AIO)
if (srv_use_native_aio) {
msg("InnoDB: Using Linux native AIO");
}
#elif defined(HAVE_URING)
if (srv_use_native_aio) {
msg("InnoDB: Using liburing");
}
#else
/* Currently native AIO is supported only on windows and linux
and that also when the support is compiled in. In all other
cases, we ignore the setting of innodb_use_native_aio. */
srv_use_native_aio = FALSE;
#endif
srv_use_native_aio= tpool::supports_native_aio();
/* Assign the default value to srv_undo_dir if it's not specified, as
my_getopt does not support default values for string options. We also
@ -2583,9 +2579,6 @@ static bool innodb_init_param()
}
}
#ifdef _WIN32
srv_use_native_aio = TRUE;
#endif
return false;
error:
@ -5473,7 +5466,6 @@ fail:
xb_fil_io_init();
if (os_aio_init()) {
msg("Error: cannot initialize AIO subsystem");
goto fail;
}

1
libmysqld/CMakeLists.txt

@ -21,7 +21,6 @@ INCLUDE_DIRECTORIES(
${CMAKE_SOURCE_DIR}/include
${CMAKE_SOURCE_DIR}/libmysqld
${CMAKE_SOURCE_DIR}/sql
${CMAKE_SOURCE_DIR}/tpool
${CMAKE_BINARY_DIR}/sql
${PCRE_INCLUDE_DIRS}
${LIBFMT_INCLUDE_DIR}

2
mysql-test/mariadb-test-run.pl

@ -4548,7 +4548,7 @@ sub extract_warning_lines ($$) {
qr|InnoDB: io_setup\(\) attempt|,
qr|InnoDB: io_setup\(\) failed with EAGAIN|,
qr|io_uring_queue_init\(\) failed with|,
qr|InnoDB: liburing disabled|,
qr|InnoDB: io_uring failed: falling back to libaio|,
qr/InnoDB: Failed to set O_DIRECT on file/,
qr|setrlimit could not change the size of core files to 'infinity';|,
qr|failed to retrieve the MAC address|,

21
mysql-test/suite/sys_vars/r/innodb_linux_aio_basic.result

@ -0,0 +1,21 @@
select @@global.innodb_linux_aio;
@@global.innodb_linux_aio
auto
select @@session.innodb_linux_aio;
ERROR HY000: Variable 'innodb_linux_aio' is a GLOBAL variable
show global variables like 'innodb_linux_aio';
Variable_name Value
innodb_linux_aio auto
show session variables like 'innodb_linux_aio';
Variable_name Value
innodb_linux_aio auto
select * from information_schema.global_variables where variable_name='innodb_linux_aio';
VARIABLE_NAME VARIABLE_VALUE
INNODB_LINUX_AIO auto
select * from information_schema.session_variables where variable_name='innodb_linux_aio';
VARIABLE_NAME VARIABLE_VALUE
INNODB_LINUX_AIO auto
set global innodb_linux_aio='auto';
ERROR HY000: Variable 'innodb_linux_aio' is a read only variable
set session innodb_linux_aio='aio';
ERROR HY000: Variable 'innodb_linux_aio' is a read only variable

1
mysql-test/suite/sys_vars/r/sysvars_innodb.result

@ -5,6 +5,7 @@ variable_name not in (
'innodb_evict_tables_on_commit_debug', # one may want to override this
'innodb_use_native_aio', # default value depends on OS
'innodb_log_file_buffering', # only available on Linux and Windows
'innodb_linux_aio', # existence depends on OS
'innodb_buffer_pool_load_pages_abort') # debug build only, and is only for testing
order by variable_name;
VARIABLE_NAME INNODB_ADAPTIVE_FLUSHING

23
mysql-test/suite/sys_vars/t/innodb_linux_aio_basic.test

@ -0,0 +1,23 @@
--source include/have_innodb.inc
--source include/linux.inc
# enum readonly
#
# show values;
#
select @@global.innodb_linux_aio;
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
select @@session.innodb_linux_aio;
show global variables like 'innodb_linux_aio';
show session variables like 'innodb_linux_aio';
select * from information_schema.global_variables where variable_name='innodb_linux_aio';
select * from information_schema.session_variables where variable_name='innodb_linux_aio';
#
# show that it's read-only
#
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
set global innodb_linux_aio='auto';
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
set session innodb_linux_aio='aio';

2
mysql-test/suite/sys_vars/t/innodb_read_io_threads_basic.test

@ -34,5 +34,7 @@ set global innodb_read_io_threads=64;
select @@innodb_read_io_threads;
--error ER_GLOBAL_VARIABLE
set session innodb_read_io_threads=1;
--disable_warnings
set global innodb_read_io_threads=@n;
--enable_warnings

1
mysql-test/suite/sys_vars/t/sysvars_innodb.test

@ -16,5 +16,6 @@ select VARIABLE_NAME, SESSION_VALUE, DEFAULT_VALUE, VARIABLE_SCOPE, VARIABLE_TYP
'innodb_evict_tables_on_commit_debug', # one may want to override this
'innodb_use_native_aio', # default value depends on OS
'innodb_log_file_buffering', # only available on Linux and Windows
'innodb_linux_aio', # existence depends on OS
'innodb_buffer_pool_load_pages_abort') # debug build only, and is only for testing
order by variable_name;

1
sql/CMakeLists.txt

@ -59,7 +59,6 @@ ${PCRE_INCLUDE_DIRS}
${ZLIB_INCLUDE_DIRS}
${SSL_INCLUDE_DIRS}
${CMAKE_BINARY_DIR}/sql
${CMAKE_SOURCE_DIR}/tpool
)
ADD_CUSTOM_COMMAND(

9
storage/innobase/CMakeLists.txt

@ -131,7 +131,6 @@ ENDIF()
INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/storage/innobase/include
${CMAKE_SOURCE_DIR}/storage/innobase/handler
${CMAKE_SOURCE_DIR}/libbinlogevents/include)
INCLUDE_DIRECTORIES(${PROJECT_SOURCE_DIR}/tpool)
SET(INNOBASE_SOURCES
btr/btr0btr.cc
@ -445,14 +444,16 @@ MYSQL_ADD_PLUGIN(innobase ${INNOBASE_SOURCES} STORAGE_ENGINE
${ZLIB_LIBRARIES}
${NUMA_LIBRARY}
${LIBSYSTEMD}
${LINKER_SCRIPT})
${LINKER_SCRIPT}
tpool
)
IF(NOT TARGET innobase)
RETURN()
ENDIF()
ADD_DEFINITIONS(${SSL_DEFINES} ${TPOOL_DEFINES})
ADD_DEFINITIONS(${SSL_DEFINES})
# A GCC bug causes crash when compiling these files on ARM64 with -O1+
# Compile them with -O0 as a workaround.
IF(CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64"
@ -541,7 +542,7 @@ IF(INNODB_ENABLE_XAP_UNLOCK_UNMODIFIED_FOR_PRIMARY)
ENDIF()
IF(NOT (PLUGIN_INNOBASE STREQUAL DYNAMIC))
TARGET_LINK_LIBRARIES(innobase tpool mysys)
TARGET_LINK_LIBRARIES(innobase mysys)
ADD_SUBDIRECTORY(${CMAKE_SOURCE_DIR}/extra/mariabackup ${CMAKE_BINARY_DIR}/extra/mariabackup)
ENDIF()

39
storage/innobase/handler/ha_innodb.cc

@ -315,6 +315,25 @@ static TYPELIB innodb_stats_method_typelib = {
NULL
};
/** Possible values for system variable "innodb_linux_aio" */
#ifdef __linux__
const char* innodb_linux_aio_names[] = {
"auto", /* SRV_LINUX_AIO_AUTO */
"io_uring", /* SRV_LINUX_AIO_IO_URING */
"aio", /* SRV_LINUX_AIO_LIBAIO */
NullS
};
/** Used to define an enumerate type of the system variable
innodb_linux_aio. Used by mariadb-backup too. */
TYPELIB innodb_linux_aio_typelib = {
array_elements(innodb_linux_aio_names) - 1,
"innodb_linux_aio_typelib",
innodb_linux_aio_names,
NULL
};
#endif
/** Possible values of the parameter innodb_checksum_algorithm */
const char* innodb_checksum_algorithm_names[] = {
"crc32",
@ -4059,12 +4078,8 @@ static int innodb_init_params()
log_sys.log_buffered= true;
#endif
#if !defined LINUX_NATIVE_AIO && !defined HAVE_URING && !defined _WIN32
/* Currently native AIO is supported only on windows and linux
and that also when the support is compiled in. In all other
cases, we ignore the setting of innodb_use_native_aio. */
srv_use_native_aio= FALSE;
#endif
if (!tpool::supports_native_aio())
srv_use_native_aio= FALSE;
#ifdef _WIN32
switch (srv_file_flush_method) {
@ -19664,6 +19679,15 @@ static MYSQL_SYSVAR_BOOL(use_native_aio, srv_use_native_aio,
"Use native AIO if supported on this platform.",
NULL, NULL, TRUE);
#ifdef __linux__
static MYSQL_SYSVAR_ENUM(linux_aio, srv_linux_aio_method,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"Specifies which Linux AIO implementation should be used."
" Possible value are \"auto\" (default) to select io_uring"
" and fallback to aio, or explicit \"io_uring\" or \"aio\"",
nullptr, nullptr, SRV_LINUX_AIO_AUTO, &innodb_linux_aio_typelib);
#endif
#ifdef HAVE_LIBNUMA
static MYSQL_SYSVAR_BOOL(numa_interleave, srv_numa_interleave,
PLUGIN_VAR_NOCMDARG | PLUGIN_VAR_READONLY,
@ -20059,6 +20083,9 @@ static struct st_mysql_sys_var* innobase_system_variables[]= {
MYSQL_SYSVAR(tmpdir),
MYSQL_SYSVAR(autoinc_lock_mode),
MYSQL_SYSVAR(use_native_aio),
#ifdef __linux__
MYSQL_SYSVAR(linux_aio),
#endif
#ifdef HAVE_LIBNUMA
MYSQL_SYSVAR(numa_interleave),
#endif /* HAVE_LIBNUMA */

13
storage/innobase/include/fil0fil.h

@ -77,6 +77,19 @@ enum srv_flush_t
#endif
};
/** Possible values of innodb_linux_aio */
#ifdef __linux__
enum srv_linux_aio_t
{
/** auto, io_uring first and then aio */
SRV_LINUX_AIO_AUTO,
/** io_uring */
SRV_LINUX_AIO_IO_URING,
/** aio (libaio interface) */
SRV_LINUX_AIO_LIBAIO
};
#endif
/** innodb_flush_method */
extern ulong srv_file_flush_method;

6
storage/innobase/include/srv0srv.h

@ -178,6 +178,12 @@ OS (provided we compiled Innobase with it in), otherwise we will
use simulated aio.
Currently we support native aio on windows and linux */
extern my_bool srv_use_native_aio;
#ifdef __linux__
/* This enum is defined which linux native io method to use */
extern ulong srv_linux_aio_method;
#endif
extern my_bool srv_numa_interleave;
/* Use atomic writes i.e disable doublewrite buffer */

184
storage/innobase/os/os0file.cc

@ -52,10 +52,6 @@ Created 10/21/1995 Heikki Tuuri
#include <tpool_structs.h>
#ifdef LINUX_NATIVE_AIO
#include <libaio.h>
#endif /* LINUX_NATIVE_AIO */
#ifdef HAVE_FALLOC_PUNCH_HOLE_AND_KEEP_SIZE
# include <fcntl.h>
# include <linux/falloc.h>
@ -3079,132 +3075,6 @@ static void write_io_callback(void *c)
write_slots->release(cb);
}
#ifdef LINUX_NATIVE_AIO
/** Checks if the system supports native linux aio. On some kernel
versions where native aio is supported it won't work on tmpfs. In such
cases we can't use native aio.
@return: true if supported, false otherwise. */
static bool is_linux_native_aio_supported()
{
File fd;
io_context_t io_ctx;
std::string log_file_path = get_log_file_path();
memset(&io_ctx, 0, sizeof(io_ctx));
if (io_setup(1, &io_ctx)) {
/* The platform does not support native aio. */
return(false);
}
else if (!srv_read_only_mode) {
/* Now check if tmpdir supports native aio ops. */
fd = mysql_tmpfile("ib");
if (fd < 0) {
ib::warn()
<< "Unable to create temp file to check"
" native AIO support.";
int ret = io_destroy(io_ctx);
ut_a(ret != -EINVAL);
ut_ad(ret != -EFAULT);
return(false);
}
}
else {
fd = my_open(log_file_path.c_str(), O_RDONLY | O_CLOEXEC,
MYF(0));
if (fd == -1) {
ib::warn() << "Unable to open \"" << log_file_path
<< "\" to check native"
<< " AIO read support.";
int ret = io_destroy(io_ctx);
ut_a(ret != EINVAL);
ut_ad(ret != EFAULT);
return(false);
}
}
struct io_event io_event;
memset(&io_event, 0x0, sizeof(io_event));
byte* ptr = static_cast<byte*>(aligned_malloc(srv_page_size,
srv_page_size));
struct iocb iocb;
/* Suppress valgrind warning. */
memset(ptr, 0, srv_page_size);
memset(&iocb, 0x0, sizeof(iocb));
struct iocb* p_iocb = &iocb;
if (!srv_read_only_mode) {
io_prep_pwrite(p_iocb, fd, ptr, srv_page_size, 0);
}
else {
ut_a(srv_page_size >= 512);
io_prep_pread(p_iocb, fd, ptr, 512, 0);
}
int err = io_submit(io_ctx, 1, &p_iocb);
if (err >= 1) {
/* Now collect the submitted IO request. */
err = io_getevents(io_ctx, 1, 1, &io_event, NULL);
}
aligned_free(ptr);
my_close(fd, MYF(MY_WME));
switch (err) {
case 1:
{
int ret = io_destroy(io_ctx);
ut_a(ret != -EINVAL);
ut_ad(ret != -EFAULT);
return(true);
}
case -EINVAL:
case -ENOSYS:
ib::warn()
<< "Linux Native AIO not supported. You can either"
" move "
<< (srv_read_only_mode ? log_file_path : "tmpdir")
<< " to a file system that supports native"
" AIO or you can set innodb_use_native_aio to"
" FALSE to avoid this message.";
/* fall through. */
default:
ib::warn()
<< "Linux Native AIO check on "
<< (srv_read_only_mode ? log_file_path : "tmpdir")
<< "returned error[" << -err << "]";
}
int ret = io_destroy(io_ctx);
ut_a(ret != -EINVAL);
ut_ad(ret != -EFAULT);
return(false);
}
#endif
int os_aio_init() noexcept
{
int max_write_events= int(srv_n_write_io_threads *
@ -3212,41 +3082,41 @@ int os_aio_init() noexcept
int max_read_events= int(srv_n_read_io_threads *
OS_AIO_N_PENDING_IOS_PER_THREAD);
int max_events= max_read_events + max_write_events;
int ret;
#if LINUX_NATIVE_AIO
if (srv_use_native_aio && !is_linux_native_aio_supported())
goto disable;
#endif
ret= srv_thread_pool->configure_aio(srv_use_native_aio, max_events);
int ret= 1;
#ifdef LINUX_NATIVE_AIO
if (ret)
if (srv_use_native_aio)
{
ut_ad(srv_use_native_aio);
disable:
ib::warn() << "Linux Native AIO disabled.";
srv_use_native_aio= false;
ret= srv_thread_pool->configure_aio(false, max_events);
}
tpool::aio_implementation aio_impl= tpool::OS_IO_DEFAULT;
#ifdef __linux__
compile_time_assert(SRV_LINUX_AIO_IO_URING == (srv_linux_aio_t)tpool::OS_IO_URING);
compile_time_assert(SRV_LINUX_AIO_LIBAIO == (srv_linux_aio_t) tpool::OS_IO_LIBAIO);
compile_time_assert(SRV_LINUX_AIO_AUTO == (srv_linux_aio_t) tpool::OS_IO_DEFAULT);
aio_impl=(tpool::aio_implementation) srv_linux_aio_method;
#endif
#ifdef HAVE_URING
if (ret)
{
ut_ad(srv_use_native_aio);
ib::warn()
<< "liburing disabled: falling back to innodb_use_native_aio=OFF";
srv_use_native_aio= false;
ret= srv_thread_pool->configure_aio(false, max_events);
ret= srv_thread_pool->configure_aio(srv_use_native_aio, max_events,
aio_impl);
if (ret)
{
srv_use_native_aio= false;
sql_print_warning("InnoDB: native AIO failed: falling back to"
" innodb_use_native_aio=OFF");
}
else
sql_print_information("InnoDB: Using %s", srv_thread_pool
->get_aio_implementation());
}
#endif
if (ret)
ret= srv_thread_pool->configure_aio(false, max_events,
tpool::OS_IO_DEFAULT);
if (!ret)
{
read_slots= new io_slots(max_read_events, srv_n_read_io_threads);
write_slots= new io_slots(max_write_events, srv_n_write_io_threads);
}
else
sql_print_error("InnoDB: Cannot initialize AIO sub-system");
return ret;
}
@ -3285,8 +3155,8 @@ int os_aio_resize(ulint n_reader_threads, ulint n_writer_threads) noexcept
int max_write_events= int(n_writer_threads * OS_AIO_N_PENDING_IOS_PER_THREAD);
int events= max_read_events + max_write_events;
/** Do the Linux AIO dance (this will try to create a new
io context with changed max_events ,etc*/
/* Do the Linux AIO dance (this will try to create a new
io context with changed max_events, etc.) */
int ret= srv_thread_pool->reconfigure_aio(srv_use_native_aio, events);

4
storage/innobase/srv/srv0srv.cc

@ -137,6 +137,10 @@ OS (provided we compiled Innobase with it in), otherwise we will
use simulated aio we build below with threads.
Currently we support native aio on windows and linux */
my_bool srv_use_native_aio;
#ifdef __linux__
/* This enum is defined which linux native io method to use */
ulong srv_linux_aio_method;
#endif
my_bool srv_numa_interleave;
/** copy of innodb_use_atomic_writes; @see innodb_init_params() */
my_bool srv_use_atomic_writes;

13
storage/innobase/srv/srv0start.cc

@ -1287,22 +1287,9 @@ dberr_t srv_start(bool create_new_db)
}
if (os_aio_init()) {
ib::error() << "Cannot initialize AIO sub-system";
return(srv_init_abort(DB_ERROR));
}
#ifdef LINUX_NATIVE_AIO
if (srv_use_native_aio) {
ib::info() << "Using Linux native AIO";
}
#endif
#ifdef HAVE_URING
if (srv_use_native_aio) {
ib::info() << "Using liburing";
}
#endif
fil_system.create(srv_file_per_table ? 50000 : 5000);
if (buf_pool.create()) {

61
tpool/CMakeLists.txt

@ -1,22 +1,36 @@
INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR} ${PROJECT_SOURCE_DIR}/include)
ADD_LIBRARY(tpool STATIC
aio_simulated.cc
tpool_structs.h
CMakeLists.txt
tpool.h
tpool_generic.cc
task_group.cc
task.cc
wait_notification.cc
)
TARGET_INCLUDE_DIRECTORIES(tpool PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}"
PRIVATE ${PROJECT_SOURCE_DIR}/include)
IF(WIN32)
SET(EXTRA_SOURCES tpool_win.cc aio_win.cc)
TARGET_SOURCES(tpool PRIVATE tpool_win.cc aio_win.cc)
ELSEIF(CMAKE_SYSTEM_NAME STREQUAL "Linux")
TARGET_SOURCES(tpool PRIVATE aio_linux.cc)
OPTION(WITH_URING "Require that io_uring be used" OFF)
OPTION(WITH_LIBAIO "Require that libaio is used, unless uring is there" OFF)
OPTION(WITH_LIBAIO "Require that libaio is used" OFF)
IF(WITH_URING)
SET(URING_REQUIRED REQUIRED)
ELSEIF(WITH_LIBAIO)
ENDIF()
IF(WITH_LIBAIO)
SET(LIBAIO_REQUIRED REQUIRED)
ENDIF()
FIND_PACKAGE(URING QUIET ${URING_REQUIRED})
IF(URING_FOUND)
SET(URING_FOUND ${URING_FOUND} PARENT_SCOPE)
SET(TPOOL_DEFINES "-DHAVE_URING" PARENT_SCOPE)
ADD_DEFINITIONS(-DHAVE_URING)
LINK_LIBRARIES(${URING_LIBRARIES})
INCLUDE_DIRECTORIES(${URING_INCLUDE_DIRS})
SET(EXTRA_SOURCES aio_liburing.cc)
TARGET_COMPILE_DEFINITIONS(tpool PUBLIC "-DHAVE_URING")
TARGET_LINK_LIBRARIES(tpool PRIVATE ${URING_LIBRARIES})
TARGET_INCLUDE_DIRECTORIES(tpool PUBLIC ${URING_INCLUDE_DIRS})
TARGET_SOURCES(tpool PRIVATE aio_liburing.cc)
SET(CMAKE_REQUIRED_INCLUDES_SAVE ${CMAKE_REQUIRED_INCLUDES})
SET(CMAKE_REQUIRED_LIBRARIES_SAVE ${CMAKE_REQUIRED_LIBRARIES})
SET(CMAKE_REQUIRED_INCLUDES ${URING_INCLUDE_DIRS})
@ -27,29 +41,16 @@ ELSEIF(CMAKE_SYSTEM_NAME STREQUAL "Linux")
IF(HAVE_IO_URING_MLOCK_SIZE)
SET_SOURCE_FILES_PROPERTIES(aio_liburing.cc PROPERTIES COMPILE_FLAGS "-DHAVE_IO_URING_MLOCK_SIZE")
ENDIF()
ELSE()
FIND_PACKAGE(LIBAIO QUIET ${LIBAIO_REQUIRED})
IF(LIBAIO_FOUND)
SET(TPOOL_DEFINES "-DLINUX_NATIVE_AIO" PARENT_SCOPE)
ADD_DEFINITIONS(-DLINUX_NATIVE_AIO)
INCLUDE_DIRECTORIES(${LIBAIO_INCLUDE_DIRS})
LINK_LIBRARIES(${LIBAIO_LIBRARIES})
SET(EXTRA_SOURCES aio_linux.cc)
ENDIF()
ENDIF()
ENDIF()
ADD_LIBRARY(tpool STATIC
aio_simulated.cc
tpool_structs.h
CMakeLists.txt
tpool.h
tpool_generic.cc
task_group.cc
task.cc
wait_notification.cc
${EXTRA_SOURCES}
)
FIND_PACKAGE(LIBAIO QUIET ${LIBAIO_REQUIRED})
IF(LIBAIO_FOUND)
TARGET_COMPILE_DEFINITIONS(tpool PUBLIC "-DHAVE_LIBAIO")
TARGET_INCLUDE_DIRECTORIES(tpool PUBLIC ${LIBAIO_INCLUDE_DIRS})
TARGET_LINK_LIBRARIES(tpool PRIVATE ${LIBAIO_LIBRARIES})
TARGET_SOURCES(tpool PRIVATE aio_libaio.cc)
ENDIF()
ENDIF()
IF(URING_FOUND)
ADD_DEPENDENCIES(tpool GenError)

193
tpool/aio_libaio.cc

@ -0,0 +1,193 @@
/* Copyright (C) 2019, 2020, MariaDB Corporation.
This program is free software; you can redistribute itand /or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#include "tpool.h"
#include <thread>
#include <sys/syscall.h>
#include <libaio.h>
/**
Invoke the io_getevents() system call, without timeout parameter.
@param ctx context from io_setup()
@param min_nr minimum number of completion events to wait for
@param nr maximum number of completion events to collect
@param ev the collected events
In https://pagure.io/libaio/c/7cede5af5adf01ad26155061cc476aad0804d3fc
the io_getevents() implementation in libaio was "optimized" so that it
would elide the system call when there are no outstanding requests
and a timeout was specified.
The libaio code for dereferencing ctx would occasionally trigger
SIGSEGV if io_destroy() was concurrently invoked from another thread.
Hence, we have to use the raw system call.
WHY are we doing this at all?
Because we want io_destroy() from another thread to interrupt io_getevents().
And, WHY do we want io_destroy() from another thread to interrupt
io_getevents()?
Because there is no documented, libaio-friendly and
race-condition-free way to interrupt io_getevents(). io_destroy()
coupled with raw syscall seemed to work for us so far.
Historical note: in the past, we used io_getevents with
timeouts. We'd wake up periodically, check for shutdown flag, return
from the main routine. This was admittedly safer, yet it did cost
periodic wakeups, which we are not willing to do anymore.
@note we also rely on the undocumented property, that io_destroy(ctx)
will make this version of io_getevents return EINVAL.
*/
static int my_getevents(io_context_t ctx, long min_nr, long nr, io_event *ev)
noexcept
{
int saved_errno= errno;
int ret= syscall(__NR_io_getevents, reinterpret_cast<long>(ctx),
min_nr, nr, ev, 0);
if (ret < 0)
{
ret= -errno;
errno= saved_errno;
}
return ret;
}
/*
Linux AIO implementation, based on native AIO.
Needs libaio.h and -laio at the compile time.
io_submit() is used to submit async IO.
A single thread will collect the completion notification
with io_getevents() and forward io completion callback to
the worker threadpool.
*/
namespace
{
using namespace tpool;
class aio_libaio final : public aio
{
thread_pool *m_pool;
io_context_t m_io_ctx;
std::thread m_getevent_thread;
static std::atomic<bool> shutdown_in_progress;
static void getevent_thread_routine(aio_libaio *aio)
{
/*
We collect events in small batches to hopefully reduce the
number of system calls.
*/
constexpr unsigned MAX_EVENTS= 256;
aio->m_pool->m_worker_init_callback();
io_event events[MAX_EVENTS];
for (;;)
{
switch (int ret= my_getevents(aio->m_io_ctx, 1, MAX_EVENTS, events)) {
case -EINTR:
continue;
case -EINVAL:
if (shutdown_in_progress)
goto end;
/* fall through */
default:
if (ret < 0)
{
fprintf(stderr, "io_getevents returned %d\n", ret);
abort();
goto end;
}
for (int i= 0; i < ret; i++)
{
const io_event &event= events[i];
aiocb *iocb= reinterpret_cast<aiocb*>(event.obj);
if (static_cast<int>(event.res) < 0)
{
iocb->m_err= -event.res;
iocb->m_ret_len= 0;
}
else
{
iocb->m_ret_len= event.res;
iocb->m_err= 0;
finish_synchronous(iocb);
}
iocb->m_internal_task.m_func= iocb->m_callback;
iocb->m_internal_task.m_arg= iocb;
iocb->m_internal_task.m_group= iocb->m_group;
aio->m_pool->submit_task(&iocb->m_internal_task);
}
}
}
end:
aio->m_pool->m_worker_destroy_callback();
}
public:
aio_libaio(io_context_t ctx, thread_pool *pool)
: m_pool(pool), m_io_ctx(ctx),
m_getevent_thread(getevent_thread_routine, this)
{
}
~aio_libaio()
{
shutdown_in_progress= true;
io_destroy(m_io_ctx);
m_getevent_thread.join();
shutdown_in_progress= false;
}
int submit_io(aiocb *cb) override
{
io_prep_pread(&cb->m_iocb, cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
if (cb->m_opcode != aio_opcode::AIO_PREAD)
cb->m_iocb.aio_lio_opcode= IO_CMD_PWRITE;
iocb *icb= &cb->m_iocb;
int ret= io_submit(m_io_ctx, 1, &icb);
if (ret == 1)
return 0;
errno= -ret;
return -1;
}
int bind(native_file_handle&) override { return 0; }
int unbind(const native_file_handle&) override { return 0; }
const char *get_implementation() const override { return "Linux native AIO"; };
};
std::atomic<bool> aio_libaio::shutdown_in_progress;
}
namespace tpool
{
aio *create_libaio(thread_pool *pool, int max_io)
{
io_context_t ctx;
memset(&ctx, 0, sizeof ctx);
if (int ret= io_setup(max_io, &ctx))
{
fprintf(stderr, "io_setup(%d) returned %d\n", max_io, ret);
return nullptr;
}
return new aio_libaio(ctx, pool);
}
}

18
tpool/aio_liburing.cc

@ -79,8 +79,9 @@ public:
thread_= std::thread(thread_routine, this);
}
const char *get_implementation() const override { return "io_uring"; };
~aio_uring() noexcept
~aio_uring() noexcept override
{
{
std::lock_guard<std::mutex> _(mutex_);
@ -103,8 +104,8 @@ public:
int submit_io(tpool::aiocb *cb) final
{
cb->iov_base= cb->m_buffer;
cb->iov_len= cb->m_len;
cb->m_iovec.iov_base= cb->m_buffer;
cb->m_iovec.iov_len= cb->m_len;
// The whole operation since io_uring_get_sqe() and till io_uring_submit()
// must be atomical. This is because liburing provides thread-unsafe calls.
@ -112,11 +113,9 @@ public:
io_uring_sqe *sqe= io_uring_get_sqe(&uring_);
if (cb->m_opcode == tpool::aio_opcode::AIO_PREAD)
io_uring_prep_readv(sqe, cb->m_fh, static_cast<struct iovec *>(cb), 1,
cb->m_offset);
io_uring_prep_readv(sqe, cb->m_fh, &cb->m_iovec, 1, cb->m_offset);
else
io_uring_prep_writev(sqe, cb->m_fh, static_cast<struct iovec *>(cb), 1,
cb->m_offset);
io_uring_prep_writev(sqe, cb->m_fh, &cb->m_iovec, 1, cb->m_offset);
io_uring_sqe_set_data(sqe, cb);
return io_uring_submit(&uring_) == 1 ? 0 : -1;
@ -202,12 +201,11 @@ private:
namespace tpool
{
aio *create_linux_aio(thread_pool *pool, int max_aio)
aio *create_uring(thread_pool *pool, int max_aio)
{
try {
return new aio_uring(pool, max_aio);
} catch (std::runtime_error& error) {
} catch (std::runtime_error&) {
return nullptr;
}
}

211
tpool/aio_linux.cc

@ -1,4 +1,4 @@
/* Copyright (C) 2019, 2020, MariaDB Corporation.
/* Copyright (C) 2025 MariaDB Corporation.
This program is free software; you can redistribute itand /or modify
it under the terms of the GNU General Public License as published by
@ -13,180 +13,61 @@ You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#include "tpool_structs.h"
#include "tpool.h"
# include <thread>
# include <atomic>
# include <cstdio>
# include <libaio.h>
# include <sys/syscall.h>
/**
Invoke the io_getevents() system call, without timeout parameter.
@param ctx context from io_setup()
@param min_nr minimum number of completion events to wait for
@param nr maximum number of completion events to collect
@param ev the collected events
In https://pagure.io/libaio/c/7cede5af5adf01ad26155061cc476aad0804d3fc
the io_getevents() implementation in libaio was "optimized" so that it
would elide the system call when there are no outstanding requests
and a timeout was specified.
The libaio code for dereferencing ctx would occasionally trigger
SIGSEGV if io_destroy() was concurrently invoked from another thread.
Hence, we have to use the raw system call.
WHY are we doing this at all?
Because we want io_destroy() from another thread to interrupt io_getevents().
And, WHY do we want io_destroy() from another thread to interrupt
io_getevents()?
Because there is no documented, libaio-friendly and race-condition-free way to
interrupt io_getevents(). io_destroy() coupled with raw syscall seemed to work
for us so far.
Historical note : in the past, we used io_getevents with timeouts. We'd wake
up periodically, check for shutdown flag, return from the main routine.
This was admittedly safer, yet it did cost periodic wakeups, which we are not
willing to do anymore.
@note we also rely on the undocumented property, that io_destroy(ctx)
will make this version of io_getevents return EINVAL.
*/
static int my_getevents(io_context_t ctx, long min_nr, long nr, io_event *ev)
{
int saved_errno= errno;
int ret= syscall(__NR_io_getevents, reinterpret_cast<long>(ctx),
min_nr, nr, ev, 0);
if (ret < 0)
{
ret= -errno;
errno= saved_errno;
}
return ret;
}
/*
Linux AIO implementation, based on native AIO.
Needs libaio.h and -laio at the compile time.
io_submit() is used to submit async IO.
This file exports create_linux_aio() function which is used to create
an asynchronous IO implementation for Linux (currently either libaio or
uring).
*/
A single thread will collect the completion notification
with io_getevents() and forward io completion callback to
the worker threadpool.
*/
#include "tpool.h"
#include <stdio.h>
namespace tpool
{
class aio_linux final : public aio
{
thread_pool *m_pool;
io_context_t m_io_ctx;
std::thread m_getevent_thread;
static std::atomic<bool> shutdown_in_progress;
static void getevent_thread_routine(aio_linux *aio)
{
/*
We collect events in small batches to hopefully reduce the
number of system calls.
*/
constexpr unsigned MAX_EVENTS= 256;
aio->m_pool->m_worker_init_callback();
io_event events[MAX_EVENTS];
for (;;)
{
switch (int ret= my_getevents(aio->m_io_ctx, 1, MAX_EVENTS, events)) {
case -EINTR:
continue;
case -EINVAL:
if (shutdown_in_progress)
goto end;
/* fall through */
default:
if (ret < 0)
{
fprintf(stderr, "io_getevents returned %d\n", ret);
abort();
goto end;
}
for (int i= 0; i < ret; i++)
{
const io_event &event= events[i];
aiocb *iocb= static_cast<aiocb*>(event.obj);
if (static_cast<int>(event.res) < 0)
{
iocb->m_err= -event.res;
iocb->m_ret_len= 0;
}
else
{
iocb->m_ret_len= event.res;
iocb->m_err= 0;
finish_synchronous(iocb);
}
iocb->m_internal_task.m_func= iocb->m_callback;
iocb->m_internal_task.m_arg= iocb;
iocb->m_internal_task.m_group= iocb->m_group;
aio->m_pool->submit_task(&iocb->m_internal_task);
}
}
}
end:
aio->m_pool->m_worker_destroy_callback();
}
public:
aio_linux(io_context_t ctx, thread_pool *pool)
: m_pool(pool), m_io_ctx(ctx),
m_getevent_thread(getevent_thread_routine, this)
{
}
~aio_linux()
{
shutdown_in_progress= true;
io_destroy(m_io_ctx);
m_getevent_thread.join();
shutdown_in_progress= false;
}
// Forward declarations of aio implementations
#ifdef HAVE_LIBAIO
// defined in aio_libaio.cc
aio *create_libaio(thread_pool *pool, int max_io);
#endif
#if defined HAVE_URING
// defined in aio_uring.cc
aio *create_uring(thread_pool *pool, int max_io);
#endif
int submit_io(aiocb *cb) override
{
io_prep_pread(static_cast<iocb*>(cb), cb->m_fh, cb->m_buffer, cb->m_len,
cb->m_offset);
if (cb->m_opcode != aio_opcode::AIO_PREAD)
cb->aio_lio_opcode= IO_CMD_PWRITE;
iocb *icb= static_cast<iocb*>(cb);
int ret= io_submit(m_io_ctx, 1, &icb);
if (ret == 1)
return 0;
errno= -ret;
return -1;
}
/*
@brief
Choose native linux aio implementation based on availability and user
preference.
int bind(native_file_handle&) override { return 0; }
int unbind(const native_file_handle&) override { return 0; }
};
@param pool - thread pool to use for aio operations
@param max_io - maximum number of concurrent io operations
@param impl - implementation to use, can be one of the following:
std::atomic<bool> aio_linux::shutdown_in_progress;
@returns
A pointer to the aio implementation object, or nullptr if no suitable
implementation is available.
aio *create_linux_aio(thread_pool *pool, int max_io)
If impl is OS_IO_DEFAULT, it will try uring first, fallback to libaio
If impl is OS_IO_URING or OS_IO_LIBAIO, it won't fallback
*/
aio *create_linux_aio(thread_pool *pool, int max_io, aio_implementation impl)
{
io_context_t ctx;
memset(&ctx, 0, sizeof ctx);
if (int ret= io_setup(max_io, &ctx))
#ifdef HAVE_URING
if (impl != OS_IO_LIBAIO)
{
fprintf(stderr, "io_setup(%d) returned %d\n", max_io, ret);
return nullptr;
aio *ret= create_uring(pool, max_io);
if (ret)
return ret;
else if (impl != OS_IO_DEFAULT)
return nullptr; // uring is not available
else
fprintf(stderr, "create_uring failed: falling back to libaio\n");
}
return new aio_linux(ctx, pool);
}
#endif
#ifdef HAVE_LIBAIO
if (impl != OS_IO_URING)
return create_libaio(pool, max_io);
#endif
return nullptr;
}
} // namespace tpool

1
tpool/aio_simulated.cc

@ -154,6 +154,7 @@ public:
int bind(native_file_handle &fd) override { return 0; }
int unbind(const native_file_handle &fd) override { return 0; }
const char *get_implementation() const override { return "simulated"; }
};
aio *create_simulated_aio(thread_pool *tp)

1
tpool/aio_win.cc

@ -131,6 +131,7 @@ public:
: GetLastError();
}
int unbind(const native_file_handle& fd) override { return 0; }
const char *get_implementation() const override { return "completion ports"; }
};
aio* create_win_aio(thread_pool* pool, int max_io)

65
tpool/tpool.h

@ -19,7 +19,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#include <mutex>
#include <atomic>
#include <tpool_structs.h>
#ifdef LINUX_NATIVE_AIO
#ifdef HAVE_LIBAIO
#include <libaio.h>
#endif
#ifdef HAVE_URING
@ -128,12 +128,21 @@ constexpr size_t MAX_AIO_USERDATA_LEN= 4 * sizeof(void*);
struct aiocb
#ifdef _WIN32
:OVERLAPPED
#elif defined LINUX_NATIVE_AIO
:iocb
#elif defined HAVE_URING
:iovec
#endif
{
#if defined HAVE_LIBAIO || defined HAVE_URING
union {
# ifdef HAVE_LIBAIO
/** The context between io_submit() and io_getevents();
must be the first data member! */
iocb m_iocb;
# endif
# ifdef HAVE_URING
/** The context between io_uring_submit() and io_uring_wait_cqe() */
iovec m_iovec;
# endif
};
#endif
native_file_handle m_fh;
aio_opcode m_opcode;
unsigned long long m_offset;
@ -173,6 +182,7 @@ public:
virtual int bind(native_file_handle &fd)= 0;
/** "Unind" file to AIO handler (used on Windows only) */
virtual int unbind(const native_file_handle &fd)= 0;
virtual const char *get_implementation() const=0;
virtual ~aio(){};
protected:
static void synchronous(aiocb *cb);
@ -202,12 +212,22 @@ class thread_pool;
extern aio *create_simulated_aio(thread_pool *tp);
enum aio_implementation
{
OS_IO_DEFAULT
#ifdef __linux__
, OS_IO_URING
, OS_IO_LIBAIO
#endif
};
class thread_pool
{
protected:
/* AIO handler */
std::unique_ptr<aio> m_aio;
virtual aio *create_native_aio(int max_io)= 0;
std::unique_ptr<aio> m_aio{};
aio_implementation m_aio_impl= OS_IO_DEFAULT;
virtual aio *create_native_aio(int max_io, aio_implementation)= 0;
public:
/**
@ -217,10 +237,7 @@ public:
void (*m_worker_init_callback)(void)= [] {};
void (*m_worker_destroy_callback)(void)= [] {};
thread_pool()
: m_aio()
{
}
thread_pool()= default;
virtual void submit_task(task *t)= 0;
virtual timer* create_timer(callback_func func, void *data=nullptr) = 0;
void set_thread_callbacks(void (*init)(), void (*destroy)())
@ -230,10 +247,13 @@ public:
m_worker_init_callback= init;
m_worker_destroy_callback= destroy;
}
int configure_aio(bool use_native_aio, int max_io)
int configure_aio(bool use_native_aio, int max_io, aio_implementation impl)
{
if (use_native_aio)
m_aio.reset(create_native_aio(max_io));
{
m_aio.reset(create_native_aio(max_io, impl));
m_aio_impl= impl;
}
else
m_aio.reset(create_simulated_aio(this));
return !m_aio ? -1 : 0;
@ -244,7 +264,7 @@ public:
assert(m_aio);
if (use_native_aio)
{
auto new_aio = create_native_aio(max_io);
auto new_aio= create_native_aio(max_io, m_aio_impl);
if (!new_aio)
return -1;
m_aio.reset(new_aio);
@ -256,6 +276,10 @@ public:
{
m_aio.reset();
}
const char *get_aio_implementation() const
{
return m_aio->get_implementation();
}
/**
Tweaks how fast worker threads are created, or how often they are signaled.
@ -281,6 +305,19 @@ public:
virtual void wait_end() {};
virtual ~thread_pool() {}
};
/** Return true if compiled with native AIO support.*/
constexpr bool supports_native_aio()
{
#ifdef _WIN32
return true;
#elif defined(__linux__) && (defined(HAVE_LIBAIO) || defined(HAVE_URING))
return true;
#else
return false;
#endif
}
const int DEFAULT_MIN_POOL_THREADS= 1;
const int DEFAULT_MAX_POOL_THREADS= 500;
extern thread_pool *

25
tpool/tpool_generic.cc

@ -37,16 +37,10 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
namespace tpool
{
#ifdef __linux__
#if defined(HAVE_URING) || defined(LINUX_NATIVE_AIO)
extern aio* create_linux_aio(thread_pool* tp, int max_io);
#else
aio *create_linux_aio(thread_pool *, int) { return nullptr; };
#endif
#endif
#ifdef _WIN32
extern aio* create_win_aio(thread_pool* tp, int max_io);
aio *create_linux_aio(thread_pool* tp, int max_io, aio_implementation);
#elif defined _WIN32
aio *create_win_aio(thread_pool* tp, int max_io);
#endif
static const std::chrono::milliseconds LONG_TASK_DURATION = std::chrono::milliseconds(500);
@ -299,16 +293,15 @@ public:
void wait_begin() override;
void wait_end() override;
void submit_task(task *task) override;
aio *create_native_aio(int max_io) override
{
#ifdef _WIN32
return create_win_aio(this, max_io);
#elif defined(__linux__)
return create_linux_aio(this,max_io);
aio *create_native_aio(int max_io, aio_implementation) override
{ return create_win_aio(this, max_io); }
#elif defined __linux__
aio *create_native_aio(int max_io, aio_implementation impl) override
{ return create_linux_aio(this, max_io, impl); }
#else
return nullptr;
aio *create_native_aio(int, aio_implementation) override { return nullptr; }
#endif
}
class timer_generic : public thr_timer_t, public timer
{

7
tpool/tpool_win.cc

@ -206,6 +206,11 @@ class thread_pool_win : public thread_pool
CloseThreadpoolIo(fd.m_ptp_io);
return 0;
}
/**
Expose implementation.
*/
const char *get_implementation() const override { return "ThreadPool"; }
};
PTP_POOL m_ptp_pool;
@ -268,7 +273,7 @@ public:
abort();
}
aio *create_native_aio(int max_io) override
aio *create_native_aio(int max_io, aio_implementation) override
{
return new native_aio(*this, max_io);
}

Loading…
Cancel
Save