64 changed files with 4464 additions and 1176 deletions
-
35include/mysql/plugin.h
-
1include/mysql/plugin_audit.h.pp
-
1include/mysql/plugin_auth.h.pp
-
1include/mysql/plugin_ftparser.h.pp
-
2mysql-test/include/show_events.inc
-
25mysql-test/r/mysqld--help.result
-
2mysql-test/suite/innodb/r/group_commit_binlog_pos.result
-
2mysql-test/suite/innodb/r/group_commit_binlog_pos_no_optimize_thread.result
-
5mysql-test/suite/perfschema/r/all_instances.result
-
8mysql-test/suite/perfschema/r/dml_setup_instruments.result
-
1mysql-test/suite/rpl/r/rpl_incident.result
-
267mysql-test/suite/rpl/r/rpl_parallel.result
-
1mysql-test/suite/rpl/t/rpl_incident-master.opt
-
7mysql-test/suite/rpl/t/rpl_incident.test
-
353mysql-test/suite/rpl/t/rpl_parallel.test
-
13mysql-test/suite/sys_vars/r/binlog_commit_wait_count_basic.result
-
13mysql-test/suite/sys_vars/r/binlog_commit_wait_usec_basic.result
-
13mysql-test/suite/sys_vars/r/slave_parallel_max_queued_basic.result
-
13mysql-test/suite/sys_vars/r/slave_parallel_threads_basic.result
-
14mysql-test/suite/sys_vars/t/binlog_commit_wait_count_basic.test
-
14mysql-test/suite/sys_vars/t/binlog_commit_wait_usec_basic.test
-
14mysql-test/suite/sys_vars/t/slave_parallel_max_queued_basic.test
-
14mysql-test/suite/sys_vars/t/slave_parallel_threads_basic.test
-
3mysys/my_getopt.c
-
2sql/CMakeLists.txt
-
15sql/handler.cc
-
393sql/log.cc
-
21sql/log.h
-
543sql/log_event.cc
-
194sql/log_event.h
-
144sql/log_event_old.cc
-
45sql/log_event_old.h
-
51sql/mysqld.cc
-
38sql/mysqld.h
-
36sql/rpl_gtid.cc
-
6sql/rpl_gtid.h
-
956sql/rpl_parallel.cc
-
125sql/rpl_parallel.h
-
10sql/rpl_record.cc
-
4sql/rpl_record.h
-
6sql/rpl_record_old.cc
-
2sql/rpl_record_old.h
-
438sql/rpl_rli.cc
-
386sql/rpl_rli.h
-
10sql/rpl_utility.cc
-
2sql/rpl_utility.h
-
4sql/share/errmsg-utf8.txt
-
687sql/slave.cc
-
6sql/slave.h
-
11sql/sp.cc
-
118sql/sql_base.cc
-
11sql/sql_binlog.cc
-
270sql/sql_class.cc
-
158sql/sql_class.h
-
8sql/sql_insert.cc
-
6sql/sql_load.cc
-
1sql/sql_parse.cc
-
77sql/sys_vars.cc
-
5sql/transaction.cc
-
5storage/innobase/handler/ha_innodb.cc
-
5storage/innobase/srv/srv0start.c
-
8storage/sphinx/snippets_udf.cc
-
5storage/xtradb/handler/ha_innodb.cc
-
6storage/xtradb/srv/srv0start.c
@ -0,0 +1,267 @@ |
|||
include/rpl_init.inc [topology=1->2] |
|||
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; |
|||
SET GLOBAL slave_parallel_threads=10; |
|||
ERROR HY000: This operation cannot be performed as you have a running slave ''; run STOP SLAVE '' first |
|||
include/stop_slave.inc |
|||
SET GLOBAL slave_parallel_threads=10; |
|||
CHANGE MASTER TO master_use_gtid=slave_pos; |
|||
include/start_slave.inc |
|||
*** Test long-running query in domain 1 can run in parallel with short queries in domain 0 *** |
|||
CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM; |
|||
CREATE TABLE t2 (a int PRIMARY KEY) ENGINE=InnoDB; |
|||
INSERT INTO t1 VALUES (1); |
|||
INSERT INTO t2 VALUES (1); |
|||
LOCK TABLE t1 WRITE; |
|||
SET gtid_domain_id=1; |
|||
INSERT INTO t1 VALUES (2); |
|||
SET gtid_domain_id=0; |
|||
INSERT INTO t2 VALUES (2); |
|||
INSERT INTO t2 VALUES (3); |
|||
BEGIN; |
|||
INSERT INTO t2 VALUES (4); |
|||
INSERT INTO t2 VALUES (5); |
|||
COMMIT; |
|||
INSERT INTO t2 VALUES (6); |
|||
SELECT * FROM t2 ORDER by a; |
|||
a |
|||
1 |
|||
2 |
|||
3 |
|||
4 |
|||
5 |
|||
6 |
|||
SELECT * FROM t1; |
|||
a |
|||
1 |
|||
UNLOCK TABLES; |
|||
SELECT * FROM t1 ORDER BY a; |
|||
a |
|||
1 |
|||
2 |
|||
*** Test two transactions in different domains committed in opposite order on slave but in a single group commit. *** |
|||
include/stop_slave.inc |
|||
SET sql_log_bin=0; |
|||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500)) |
|||
RETURNS INT DETERMINISTIC |
|||
BEGIN |
|||
RETURN x; |
|||
END |
|||
|| |
|||
SET sql_log_bin=1; |
|||
SET @old_format= @@SESSION.binlog_format; |
|||
SET binlog_format='statement'; |
|||
SET gtid_domain_id=1; |
|||
INSERT INTO t2 VALUES (foo(10, |
|||
'commit_before_enqueue SIGNAL ready1 WAIT_FOR cont1', |
|||
'commit_after_release_LOCK_prepare_ordered SIGNAL ready2')); |
|||
FLUSH LOGS; |
|||
SET sql_log_bin=0; |
|||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500)) |
|||
RETURNS INT DETERMINISTIC |
|||
BEGIN |
|||
IF d1 != '' THEN |
|||
SET debug_sync = d1; |
|||
END IF; |
|||
IF d2 != '' THEN |
|||
SET debug_sync = d2; |
|||
END IF; |
|||
RETURN x; |
|||
END |
|||
|| |
|||
SET sql_log_bin=1; |
|||
SET @old_format=@@GLOBAL.binlog_format; |
|||
SET GLOBAL binlog_format=statement; |
|||
SET GLOBAL slave_parallel_threads=0; |
|||
SET GLOBAL slave_parallel_threads=10; |
|||
include/start_slave.inc |
|||
SET debug_sync='now WAIT_FOR ready1'; |
|||
SET gtid_domain_id=2; |
|||
INSERT INTO t2 VALUES (foo(11, |
|||
'commit_before_enqueue SIGNAL ready3 WAIT_FOR cont3', |
|||
'commit_after_release_LOCK_prepare_ordered SIGNAL ready4 WAIT_FOR cont4')); |
|||
SET gtid_domain_id=0; |
|||
SELECT * FROM t2 WHERE a >= 10 ORDER BY a; |
|||
a |
|||
10 |
|||
11 |
|||
SET debug_sync='now WAIT_FOR ready3'; |
|||
SET debug_sync='now SIGNAL cont3'; |
|||
SET debug_sync='now WAIT_FOR ready4'; |
|||
SET debug_sync='now SIGNAL cont1'; |
|||
SET debug_sync='now WAIT_FOR ready2'; |
|||
SET debug_sync='now SIGNAL cont4'; |
|||
SELECT * FROM t2 WHERE a >= 10 ORDER BY a; |
|||
a |
|||
10 |
|||
11 |
|||
show binlog events in 'slave-bin.000002' from <binlog_start>; |
|||
Log_name Pos Event_type Server_id End_log_pos Info |
|||
slave-bin.000002 # Binlog_checkpoint # # slave-bin.000002 |
|||
slave-bin.000002 # Gtid # # BEGIN GTID #-#-# cid=# |
|||
slave-bin.000002 # Query # # use `test`; INSERT INTO t2 VALUES (foo(11, |
|||
'commit_before_enqueue SIGNAL ready3 WAIT_FOR cont3', |
|||
'commit_after_release_LOCK_prepare_ordered SIGNAL ready4 WAIT_FOR cont4')) |
|||
slave-bin.000002 # Xid # # COMMIT /* XID */ |
|||
slave-bin.000002 # Gtid # # BEGIN GTID #-#-# cid=# |
|||
slave-bin.000002 # Query # # use `test`; INSERT INTO t2 VALUES (foo(10, |
|||
'commit_before_enqueue SIGNAL ready1 WAIT_FOR cont1', |
|||
'commit_after_release_LOCK_prepare_ordered SIGNAL ready2')) |
|||
slave-bin.000002 # Xid # # COMMIT /* XID */ |
|||
FLUSH LOGS; |
|||
include/stop_slave.inc |
|||
SET GLOBAL slave_parallel_threads=0; |
|||
SET GLOBAL slave_parallel_threads=10; |
|||
SET debug_sync='RESET'; |
|||
include/start_slave.inc |
|||
*** Test that group-committed transactions on the master can replicate in parallel on the slave. *** |
|||
FLUSH LOGS; |
|||
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; |
|||
INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7); |
|||
BEGIN; |
|||
INSERT INTO t3 VALUES (2,102); |
|||
BEGIN; |
|||
INSERT INTO t3 VALUES (4,104); |
|||
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1'; |
|||
SET binlog_format=statement; |
|||
INSERT INTO t3 VALUES (2, foo(12, |
|||
'commit_after_release_LOCK_prepare_ordered SIGNAL slave_queued1 WAIT_FOR slave_cont1', |
|||
'')); |
|||
SET debug_sync='now WAIT_FOR master_queued1'; |
|||
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2'; |
|||
SET binlog_format=statement; |
|||
INSERT INTO t3 VALUES (4, foo(14, |
|||
'commit_after_release_LOCK_prepare_ordered SIGNAL slave_queued2', |
|||
'')); |
|||
SET debug_sync='now WAIT_FOR master_queued2'; |
|||
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3'; |
|||
SET binlog_format=statement; |
|||
INSERT INTO t3 VALUES (6, foo(16, |
|||
'group_commit_waiting_for_prior SIGNAL slave_queued3', |
|||
'')); |
|||
SET debug_sync='now WAIT_FOR master_queued3'; |
|||
SET debug_sync='now SIGNAL master_cont1'; |
|||
SELECT * FROM t3 ORDER BY a; |
|||
a b |
|||
1 1 |
|||
2 12 |
|||
3 3 |
|||
4 14 |
|||
5 5 |
|||
6 16 |
|||
7 7 |
|||
show binlog events in 'master-bin.000002' from <binlog_start>; |
|||
Log_name Pos Event_type Server_id End_log_pos Info |
|||
master-bin.000002 # Binlog_checkpoint # # master-bin.000002 |
|||
master-bin.000002 # Gtid # # GTID #-#-# |
|||
master-bin.000002 # Query # # use `test`; CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB |
|||
master-bin.000002 # Gtid # # BEGIN GTID #-#-# |
|||
master-bin.000002 # Query # # use `test`; INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7) |
|||
master-bin.000002 # Xid # # COMMIT /* XID */ |
|||
master-bin.000002 # Gtid # # BEGIN GTID #-#-# cid=# |
|||
master-bin.000002 # Query # # use `test`; INSERT INTO t3 VALUES (2, foo(12, |
|||
'commit_after_release_LOCK_prepare_ordered SIGNAL slave_queued1 WAIT_FOR slave_cont1', |
|||
'')) |
|||
master-bin.000002 # Xid # # COMMIT /* XID */ |
|||
master-bin.000002 # Gtid # # BEGIN GTID #-#-# cid=# |
|||
master-bin.000002 # Query # # use `test`; INSERT INTO t3 VALUES (4, foo(14, |
|||
'commit_after_release_LOCK_prepare_ordered SIGNAL slave_queued2', |
|||
'')) |
|||
master-bin.000002 # Xid # # COMMIT /* XID */ |
|||
master-bin.000002 # Gtid # # BEGIN GTID #-#-# cid=# |
|||
master-bin.000002 # Query # # use `test`; INSERT INTO t3 VALUES (6, foo(16, |
|||
'group_commit_waiting_for_prior SIGNAL slave_queued3', |
|||
'')) |
|||
master-bin.000002 # Xid # # COMMIT /* XID */ |
|||
SET debug_sync='now WAIT_FOR slave_queued3'; |
|||
ROLLBACK; |
|||
SET debug_sync='now WAIT_FOR slave_queued1'; |
|||
ROLLBACK; |
|||
SET debug_sync='now WAIT_FOR slave_queued2'; |
|||
SET debug_sync='now SIGNAL slave_cont1'; |
|||
SELECT * FROM t3 ORDER BY a; |
|||
a b |
|||
1 1 |
|||
2 12 |
|||
3 3 |
|||
4 14 |
|||
5 5 |
|||
6 16 |
|||
7 7 |
|||
show binlog events in 'slave-bin.000003' from <binlog_start>; |
|||
Log_name Pos Event_type Server_id End_log_pos Info |
|||
slave-bin.000003 # Binlog_checkpoint # # slave-bin.000003 |
|||
slave-bin.000003 # Gtid # # GTID #-#-# |
|||
slave-bin.000003 # Query # # use `test`; CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB |
|||
slave-bin.000003 # Gtid # # BEGIN GTID #-#-# |
|||
slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7) |
|||
slave-bin.000003 # Xid # # COMMIT /* XID */ |
|||
slave-bin.000003 # Gtid # # BEGIN GTID #-#-# cid=# |
|||
slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (2, foo(12, |
|||
'commit_after_release_LOCK_prepare_ordered SIGNAL slave_queued1 WAIT_FOR slave_cont1', |
|||
'')) |
|||
slave-bin.000003 # Xid # # COMMIT /* XID */ |
|||
slave-bin.000003 # Gtid # # BEGIN GTID #-#-# cid=# |
|||
slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (4, foo(14, |
|||
'commit_after_release_LOCK_prepare_ordered SIGNAL slave_queued2', |
|||
'')) |
|||
slave-bin.000003 # Xid # # COMMIT /* XID */ |
|||
slave-bin.000003 # Gtid # # BEGIN GTID #-#-# cid=# |
|||
slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (6, foo(16, |
|||
'group_commit_waiting_for_prior SIGNAL slave_queued3', |
|||
'')) |
|||
slave-bin.000003 # Xid # # COMMIT /* XID */ |
|||
*** Test STOP SLAVE in parallel mode *** |
|||
include/stop_slave.inc |
|||
SET binlog_direct_non_transactional_updates=0; |
|||
SET sql_log_bin=0; |
|||
CALL mtr.add_suppression("Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction"); |
|||
SET sql_log_bin=1; |
|||
BEGIN; |
|||
INSERT INTO t2 VALUES (20); |
|||
INSERT INTO t1 VALUES (20); |
|||
INSERT INTO t2 VALUES (21); |
|||
INSERT INTO t3 VALUES (20, 20); |
|||
COMMIT; |
|||
INSERT INTO t3 VALUES(21, 21); |
|||
INSERT INTO t3 VALUES(22, 22); |
|||
SET binlog_format=@old_format; |
|||
BEGIN; |
|||
INSERT INTO t2 VALUES (21); |
|||
START SLAVE; |
|||
STOP SLAVE; |
|||
ROLLBACK; |
|||
include/wait_for_slave_to_stop.inc |
|||
SELECT * FROM t1 WHERE a >= 20 ORDER BY a; |
|||
a |
|||
20 |
|||
SELECT * FROM t2 WHERE a >= 20 ORDER BY a; |
|||
a |
|||
20 |
|||
21 |
|||
SELECT * FROM t3 WHERE a >= 20 ORDER BY a; |
|||
a b |
|||
20 20 |
|||
include/start_slave.inc |
|||
SELECT * FROM t1 WHERE a >= 20 ORDER BY a; |
|||
a |
|||
20 |
|||
SELECT * FROM t2 WHERE a >= 20 ORDER BY a; |
|||
a |
|||
20 |
|||
21 |
|||
SELECT * FROM t3 WHERE a >= 20 ORDER BY a; |
|||
a b |
|||
20 20 |
|||
21 21 |
|||
22 22 |
|||
include/stop_slave.inc |
|||
SET GLOBAL binlog_format=@old_format; |
|||
SET GLOBAL slave_parallel_threads=0; |
|||
SET GLOBAL slave_parallel_threads=10; |
|||
include/start_slave.inc |
|||
include/stop_slave.inc |
|||
SET GLOBAL slave_parallel_threads=@old_parallel_threads; |
|||
include/start_slave.inc |
|||
DROP function foo; |
|||
DROP TABLE t1,t2,t3; |
|||
include/rpl_end.inc |
|||
@ -1 +0,0 @@ |
|||
--loose-debug=+d,incident_database_resync_on_replace |
|||
@ -0,0 +1,353 @@ |
|||
--source include/have_innodb.inc |
|||
--source include/have_debug.inc |
|||
--source include/have_debug_sync.inc |
|||
--let $rpl_topology=1->2 |
|||
--source include/rpl_init.inc |
|||
|
|||
# Test various aspects of parallel replication. |
|||
|
|||
--connection server_2 |
|||
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; |
|||
--error ER_SLAVE_MUST_STOP |
|||
SET GLOBAL slave_parallel_threads=10; |
|||
--source include/stop_slave.inc |
|||
SET GLOBAL slave_parallel_threads=10; |
|||
CHANGE MASTER TO master_use_gtid=slave_pos; |
|||
--source include/start_slave.inc |
|||
|
|||
|
|||
--echo *** Test long-running query in domain 1 can run in parallel with short queries in domain 0 *** |
|||
|
|||
--connection server_1 |
|||
CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM; |
|||
CREATE TABLE t2 (a int PRIMARY KEY) ENGINE=InnoDB; |
|||
INSERT INTO t1 VALUES (1); |
|||
INSERT INTO t2 VALUES (1); |
|||
--save_master_pos |
|||
|
|||
--connection server_2 |
|||
--sync_with_master |
|||
|
|||
# Block the table t1 to simulate a replicated query taking a long time. |
|||
--connect (con_temp1,127.0.0.1,root,,test,$SERVER_MYPORT_2,) |
|||
LOCK TABLE t1 WRITE; |
|||
|
|||
--connection server_1 |
|||
SET gtid_domain_id=1; |
|||
# This query will be blocked on the slave until UNLOCK TABLES. |
|||
INSERT INTO t1 VALUES (2); |
|||
SET gtid_domain_id=0; |
|||
# These t2 queries can be replicated in parallel with the prior t1 query, as |
|||
# they are in a separate replication domain. |
|||
INSERT INTO t2 VALUES (2); |
|||
INSERT INTO t2 VALUES (3); |
|||
BEGIN; |
|||
INSERT INTO t2 VALUES (4); |
|||
INSERT INTO t2 VALUES (5); |
|||
COMMIT; |
|||
INSERT INTO t2 VALUES (6); |
|||
|
|||
--connection server_2 |
|||
--let $wait_condition= SELECT COUNT(*) = 6 FROM t2 |
|||
--source include/wait_condition.inc |
|||
|
|||
SELECT * FROM t2 ORDER by a; |
|||
|
|||
--connection con_temp1 |
|||
SELECT * FROM t1; |
|||
UNLOCK TABLES; |
|||
|
|||
--connection server_2 |
|||
--let $wait_condition= SELECT COUNT(*) = 2 FROM t1 |
|||
--source include/wait_condition.inc |
|||
|
|||
SELECT * FROM t1 ORDER BY a; |
|||
|
|||
|
|||
--echo *** Test two transactions in different domains committed in opposite order on slave but in a single group commit. *** |
|||
--connection server_2 |
|||
--source include/stop_slave.inc |
|||
|
|||
--connection server_1 |
|||
# Use a stored function to inject a debug_sync into the appropriate THD. |
|||
# The function does nothing on the master, and on the slave it injects the |
|||
# desired debug_sync action(s). |
|||
SET sql_log_bin=0; |
|||
--delimiter || |
|||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500)) |
|||
RETURNS INT DETERMINISTIC |
|||
BEGIN |
|||
RETURN x; |
|||
END |
|||
|| |
|||
--delimiter ; |
|||
SET sql_log_bin=1; |
|||
|
|||
SET @old_format= @@SESSION.binlog_format; |
|||
SET binlog_format='statement'; |
|||
SET gtid_domain_id=1; |
|||
INSERT INTO t2 VALUES (foo(10, |
|||
'commit_before_enqueue SIGNAL ready1 WAIT_FOR cont1', |
|||
'commit_after_release_LOCK_prepare_ordered SIGNAL ready2')); |
|||
|
|||
--connection server_2 |
|||
FLUSH LOGS; |
|||
--source include/wait_for_binlog_checkpoint.inc |
|||
SET sql_log_bin=0; |
|||
--delimiter || |
|||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500)) |
|||
RETURNS INT DETERMINISTIC |
|||
BEGIN |
|||
IF d1 != '' THEN |
|||
SET debug_sync = d1; |
|||
END IF; |
|||
IF d2 != '' THEN |
|||
SET debug_sync = d2; |
|||
END IF; |
|||
RETURN x; |
|||
END |
|||
|| |
|||
--delimiter ; |
|||
SET sql_log_bin=1; |
|||
SET @old_format=@@GLOBAL.binlog_format; |
|||
SET GLOBAL binlog_format=statement; |
|||
# We need to restart all parallel threads for the new global setting to |
|||
# be copied to the session-level values. |
|||
SET GLOBAL slave_parallel_threads=0; |
|||
SET GLOBAL slave_parallel_threads=10; |
|||
--source include/start_slave.inc |
|||
|
|||
# First make sure the first insert is ready to commit, but not queued yet. |
|||
SET debug_sync='now WAIT_FOR ready1'; |
|||
|
|||
--connection server_1 |
|||
SET gtid_domain_id=2; |
|||
INSERT INTO t2 VALUES (foo(11, |
|||
'commit_before_enqueue SIGNAL ready3 WAIT_FOR cont3', |
|||
'commit_after_release_LOCK_prepare_ordered SIGNAL ready4 WAIT_FOR cont4')); |
|||
SET gtid_domain_id=0; |
|||
SELECT * FROM t2 WHERE a >= 10 ORDER BY a; |
|||
|
|||
--connection server_2 |
|||
# Now wait for the second insert to queue itself as the leader, and then |
|||
# wait for more commits to queue up. |
|||
SET debug_sync='now WAIT_FOR ready3'; |
|||
SET debug_sync='now SIGNAL cont3'; |
|||
SET debug_sync='now WAIT_FOR ready4'; |
|||
# Now allow the first insert to queue up to participate in group commit. |
|||
SET debug_sync='now SIGNAL cont1'; |
|||
SET debug_sync='now WAIT_FOR ready2'; |
|||
# Finally allow the second insert to proceed and do the group commit. |
|||
SET debug_sync='now SIGNAL cont4'; |
|||
|
|||
--let $wait_condition= SELECT COUNT(*) = 2 FROM t2 WHERE a >= 10 |
|||
--source include/wait_condition.inc |
|||
SELECT * FROM t2 WHERE a >= 10 ORDER BY a; |
|||
# The two INSERT transactions should have been committed in opposite order, |
|||
# but in the same group commit (seen by precense of cid=# in the SHOW |
|||
# BINLOG output). |
|||
--let $binlog_file= slave-bin.000002 |
|||
--source include/show_binlog_events.inc |
|||
FLUSH LOGS; |
|||
--source include/wait_for_binlog_checkpoint.inc |
|||
|
|||
# Restart all the slave parallel worker threads, to clear all debug_sync actions. |
|||
--connection server_2 |
|||
--source include/stop_slave.inc |
|||
SET GLOBAL slave_parallel_threads=0; |
|||
SET GLOBAL slave_parallel_threads=10; |
|||
SET debug_sync='RESET'; |
|||
--source include/start_slave.inc |
|||
|
|||
|
|||
--echo *** Test that group-committed transactions on the master can replicate in parallel on the slave. *** |
|||
--connection server_1 |
|||
FLUSH LOGS; |
|||
--source include/wait_for_binlog_checkpoint.inc |
|||
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; |
|||
# Create some sentinel rows so that the rows inserted in parallel fall into |
|||
# separate gaps and do not cause gap lock conflicts. |
|||
INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7); |
|||
--save_master_pos |
|||
--connection server_2 |
|||
--sync_with_master |
|||
|
|||
# We want to test that the transactions can execute out-of-order on |
|||
# the slave, but still end up committing in-order, and in a single |
|||
# group commit. |
|||
# |
|||
# The idea is to group-commit three transactions together on the master: |
|||
# A, B, and C. On the slave, C will execute the insert first, then A, |
|||
# and then B. But B manages to complete before A has time to commit, so |
|||
# all three end up committing together. |
|||
# |
|||
# So we start by setting up some row locks that will block transactions |
|||
# A and B from executing, allowing C to run first. |
|||
|
|||
--connection con_temp1 |
|||
BEGIN; |
|||
INSERT INTO t3 VALUES (2,102); |
|||
--connect (con_temp2,127.0.0.1,root,,test,$SERVER_MYPORT_2,) |
|||
BEGIN; |
|||
INSERT INTO t3 VALUES (4,104); |
|||
|
|||
# On the master, queue three INSERT transactions as a single group commit. |
|||
--connect (con_temp3,127.0.0.1,root,,test,$SERVER_MYPORT_1,) |
|||
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1'; |
|||
SET binlog_format=statement; |
|||
send INSERT INTO t3 VALUES (2, foo(12, |
|||
'commit_after_release_LOCK_prepare_ordered SIGNAL slave_queued1 WAIT_FOR slave_cont1', |
|||
'')); |
|||
|
|||
--connection server_1 |
|||
SET debug_sync='now WAIT_FOR master_queued1'; |
|||
|
|||
--connect (con_temp4,127.0.0.1,root,,test,$SERVER_MYPORT_1,) |
|||
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2'; |
|||
SET binlog_format=statement; |
|||
send INSERT INTO t3 VALUES (4, foo(14, |
|||
'commit_after_release_LOCK_prepare_ordered SIGNAL slave_queued2', |
|||
'')); |
|||
|
|||
--connection server_1 |
|||
SET debug_sync='now WAIT_FOR master_queued2'; |
|||
|
|||
--connect (con_temp5,127.0.0.1,root,,test,$SERVER_MYPORT_1,) |
|||
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3'; |
|||
SET binlog_format=statement; |
|||
send INSERT INTO t3 VALUES (6, foo(16, |
|||
'group_commit_waiting_for_prior SIGNAL slave_queued3', |
|||
'')); |
|||
|
|||
--connection server_1 |
|||
SET debug_sync='now WAIT_FOR master_queued3'; |
|||
SET debug_sync='now SIGNAL master_cont1'; |
|||
|
|||
--connection con_temp3 |
|||
REAP; |
|||
--connection con_temp4 |
|||
REAP; |
|||
--connection con_temp5 |
|||
REAP; |
|||
|
|||
--connection server_1 |
|||
SELECT * FROM t3 ORDER BY a; |
|||
--let $binlog_file= master-bin.000002 |
|||
--source include/show_binlog_events.inc |
|||
|
|||
# First, wait until insert 3 is ready to queue up for group commit, but is |
|||
# waiting for insert 2 to commit before it can do so itself. |
|||
--connection server_2 |
|||
SET debug_sync='now WAIT_FOR slave_queued3'; |
|||
|
|||
# Next, let insert 1 proceed, and allow it to queue up as the group commit |
|||
# leader, but let it wait for insert 2 to also queue up before proceeding. |
|||
--connection con_temp1 |
|||
ROLLBACK; |
|||
--connection server_2 |
|||
SET debug_sync='now WAIT_FOR slave_queued1'; |
|||
|
|||
# Now let insert 2 proceed and queue up. |
|||
--connection con_temp2 |
|||
ROLLBACK; |
|||
--connection server_2 |
|||
SET debug_sync='now WAIT_FOR slave_queued2'; |
|||
# And finally, we can let insert 1 proceed and do the group commit with all |
|||
# three insert transactions together. |
|||
SET debug_sync='now SIGNAL slave_cont1'; |
|||
|
|||
# Wait for the commit to complete and check that all three transactions |
|||
# group-committed together (will be seen in the binlog as all three having |
|||
# cid=# on their GTID event). |
|||
--let $wait_condition= SELECT COUNT(*) = 3 FROM t3 WHERE a IN (2,4,6) |
|||
--source include/wait_condition.inc |
|||
SELECT * FROM t3 ORDER BY a; |
|||
--let $binlog_file= slave-bin.000003 |
|||
--source include/show_binlog_events.inc |
|||
|
|||
|
|||
--echo *** Test STOP SLAVE in parallel mode *** |
|||
--connection server_2 |
|||
--source include/stop_slave.inc |
|||
|
|||
--connection server_1 |
|||
# Set up a couple of transactions. The first will be blocked halfway |
|||
# through on a lock, and while it is blocked we initiate STOP SLAVE. |
|||
# We then test that the halfway-initiated transaction is allowed to |
|||
# complete, but no subsequent ones. |
|||
# We have to use statement-based mode and set |
|||
# binlog_direct_non_transactional_updates=0; otherwise the binlog will |
|||
# be split into two event groups, one for the MyISAM part and one for the |
|||
# InnoDB part. |
|||
SET binlog_direct_non_transactional_updates=0; |
|||
SET sql_log_bin=0; |
|||
CALL mtr.add_suppression("Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction"); |
|||
SET sql_log_bin=1; |
|||
BEGIN; |
|||
INSERT INTO t2 VALUES (20); |
|||
--disable_warnings |
|||
INSERT INTO t1 VALUES (20); |
|||
--disable_warnings |
|||
INSERT INTO t2 VALUES (21); |
|||
INSERT INTO t3 VALUES (20, 20); |
|||
COMMIT; |
|||
INSERT INTO t3 VALUES(21, 21); |
|||
INSERT INTO t3 VALUES(22, 22); |
|||
SET binlog_format=@old_format; |
|||
--save_master_pos |
|||
|
|||
# Start a connection that will block the replicated transaction halfway. |
|||
--connection con_temp1 |
|||
BEGIN; |
|||
INSERT INTO t2 VALUES (21); |
|||
|
|||
--connection server_2 |
|||
START SLAVE; |
|||
# Wait for the MyISAM change to be visible, after which replication will wait |
|||
# for con_temp1 to roll back. |
|||
--let $wait_condition= SELECT COUNT(*) = 1 FROM t1 WHERE a=20 |
|||
--source include/wait_condition.inc |
|||
|
|||
--connection con_temp2 |
|||
# Initiate slave stop. It will have to wait for the current event group |
|||
# to complete. |
|||
send STOP SLAVE; |
|||
|
|||
--connection con_temp1 |
|||
ROLLBACK; |
|||
|
|||
--connection con_temp2 |
|||
reap; |
|||
|
|||
--connection server_2 |
|||
--source include/wait_for_slave_to_stop.inc |
|||
# We should see the first transaction applied, but not the two others. |
|||
SELECT * FROM t1 WHERE a >= 20 ORDER BY a; |
|||
SELECT * FROM t2 WHERE a >= 20 ORDER BY a; |
|||
SELECT * FROM t3 WHERE a >= 20 ORDER BY a; |
|||
|
|||
--source include/start_slave.inc |
|||
--sync_with_master |
|||
SELECT * FROM t1 WHERE a >= 20 ORDER BY a; |
|||
SELECT * FROM t2 WHERE a >= 20 ORDER BY a; |
|||
SELECT * FROM t3 WHERE a >= 20 ORDER BY a; |
|||
|
|||
|
|||
--connection server_2 |
|||
--source include/stop_slave.inc |
|||
SET GLOBAL binlog_format=@old_format; |
|||
SET GLOBAL slave_parallel_threads=0; |
|||
SET GLOBAL slave_parallel_threads=10; |
|||
--source include/start_slave.inc |
|||
|
|||
|
|||
--connection server_2 |
|||
--source include/stop_slave.inc |
|||
SET GLOBAL slave_parallel_threads=@old_parallel_threads; |
|||
--source include/start_slave.inc |
|||
|
|||
--connection server_1 |
|||
DROP function foo; |
|||
DROP TABLE t1,t2,t3; |
|||
|
|||
--source include/rpl_end.inc |
|||
@ -0,0 +1,13 @@ |
|||
SET @save_binlog_commit_wait_count= @@GLOBAL.binlog_commit_wait_count; |
|||
SELECT @@GLOBAL.binlog_commit_wait_count as 'must be zero because of default'; |
|||
must be zero because of default |
|||
0 |
|||
SELECT @@SESSION.binlog_commit_wait_count as 'no session var'; |
|||
ERROR HY000: Variable 'binlog_commit_wait_count' is a GLOBAL variable |
|||
SET GLOBAL binlog_commit_wait_count= 0; |
|||
SET GLOBAL binlog_commit_wait_count= DEFAULT; |
|||
SET GLOBAL binlog_commit_wait_count= 10; |
|||
SELECT @@GLOBAL.binlog_commit_wait_count; |
|||
@@GLOBAL.binlog_commit_wait_count |
|||
10 |
|||
SET GLOBAL binlog_commit_wait_count = @save_binlog_commit_wait_count; |
|||
@ -0,0 +1,13 @@ |
|||
SET @save_binlog_commit_wait_usec= @@GLOBAL.binlog_commit_wait_usec; |
|||
SELECT @@GLOBAL.binlog_commit_wait_usec as 'check default'; |
|||
check default |
|||
100000 |
|||
SELECT @@SESSION.binlog_commit_wait_usec as 'no session var'; |
|||
ERROR HY000: Variable 'binlog_commit_wait_usec' is a GLOBAL variable |
|||
SET GLOBAL binlog_commit_wait_usec= 0; |
|||
SET GLOBAL binlog_commit_wait_usec= DEFAULT; |
|||
SET GLOBAL binlog_commit_wait_usec= 10000; |
|||
SELECT @@GLOBAL.binlog_commit_wait_usec; |
|||
@@GLOBAL.binlog_commit_wait_usec |
|||
10000 |
|||
SET GLOBAL binlog_commit_wait_usec = @save_binlog_commit_wait_usec; |
|||
@ -0,0 +1,13 @@ |
|||
SET @save_slave_parallel_max_queued= @@GLOBAL.slave_parallel_max_queued; |
|||
SELECT @@GLOBAL.slave_parallel_max_queued as 'Check default'; |
|||
Check default |
|||
131072 |
|||
SELECT @@SESSION.slave_parallel_max_queued as 'no session var'; |
|||
ERROR HY000: Variable 'slave_parallel_max_queued' is a GLOBAL variable |
|||
SET GLOBAL slave_parallel_max_queued= 0; |
|||
SET GLOBAL slave_parallel_max_queued= DEFAULT; |
|||
SET GLOBAL slave_parallel_max_queued= 65536; |
|||
SELECT @@GLOBAL.slave_parallel_max_queued; |
|||
@@GLOBAL.slave_parallel_max_queued |
|||
65536 |
|||
SET GLOBAL slave_parallel_max_queued = @save_slave_parallel_max_queued; |
|||
@ -0,0 +1,13 @@ |
|||
SET @save_slave_parallel_threads= @@GLOBAL.slave_parallel_threads; |
|||
SELECT @@GLOBAL.slave_parallel_threads as 'must be zero because of default'; |
|||
must be zero because of default |
|||
0 |
|||
SELECT @@SESSION.slave_parallel_threads as 'no session var'; |
|||
ERROR HY000: Variable 'slave_parallel_threads' is a GLOBAL variable |
|||
SET GLOBAL slave_parallel_threads= 0; |
|||
SET GLOBAL slave_parallel_threads= DEFAULT; |
|||
SET GLOBAL slave_parallel_threads= 10; |
|||
SELECT @@GLOBAL.slave_parallel_threads; |
|||
@@GLOBAL.slave_parallel_threads |
|||
10 |
|||
SET GLOBAL slave_parallel_threads = @save_slave_parallel_threads; |
|||
@ -0,0 +1,14 @@ |
|||
--source include/not_embedded.inc |
|||
|
|||
SET @save_binlog_commit_wait_count= @@GLOBAL.binlog_commit_wait_count; |
|||
|
|||
SELECT @@GLOBAL.binlog_commit_wait_count as 'must be zero because of default'; |
|||
--error ER_INCORRECT_GLOBAL_LOCAL_VAR |
|||
SELECT @@SESSION.binlog_commit_wait_count as 'no session var'; |
|||
|
|||
SET GLOBAL binlog_commit_wait_count= 0; |
|||
SET GLOBAL binlog_commit_wait_count= DEFAULT; |
|||
SET GLOBAL binlog_commit_wait_count= 10; |
|||
SELECT @@GLOBAL.binlog_commit_wait_count; |
|||
|
|||
SET GLOBAL binlog_commit_wait_count = @save_binlog_commit_wait_count; |
|||
@ -0,0 +1,14 @@ |
|||
--source include/not_embedded.inc |
|||
|
|||
SET @save_binlog_commit_wait_usec= @@GLOBAL.binlog_commit_wait_usec; |
|||
|
|||
SELECT @@GLOBAL.binlog_commit_wait_usec as 'check default'; |
|||
--error ER_INCORRECT_GLOBAL_LOCAL_VAR |
|||
SELECT @@SESSION.binlog_commit_wait_usec as 'no session var'; |
|||
|
|||
SET GLOBAL binlog_commit_wait_usec= 0; |
|||
SET GLOBAL binlog_commit_wait_usec= DEFAULT; |
|||
SET GLOBAL binlog_commit_wait_usec= 10000; |
|||
SELECT @@GLOBAL.binlog_commit_wait_usec; |
|||
|
|||
SET GLOBAL binlog_commit_wait_usec = @save_binlog_commit_wait_usec; |
|||
@ -0,0 +1,14 @@ |
|||
--source include/not_embedded.inc |
|||
|
|||
SET @save_slave_parallel_max_queued= @@GLOBAL.slave_parallel_max_queued; |
|||
|
|||
SELECT @@GLOBAL.slave_parallel_max_queued as 'Check default'; |
|||
--error ER_INCORRECT_GLOBAL_LOCAL_VAR |
|||
SELECT @@SESSION.slave_parallel_max_queued as 'no session var'; |
|||
|
|||
SET GLOBAL slave_parallel_max_queued= 0; |
|||
SET GLOBAL slave_parallel_max_queued= DEFAULT; |
|||
SET GLOBAL slave_parallel_max_queued= 65536; |
|||
SELECT @@GLOBAL.slave_parallel_max_queued; |
|||
|
|||
SET GLOBAL slave_parallel_max_queued = @save_slave_parallel_max_queued; |
|||
@ -0,0 +1,14 @@ |
|||
--source include/not_embedded.inc |
|||
|
|||
SET @save_slave_parallel_threads= @@GLOBAL.slave_parallel_threads; |
|||
|
|||
SELECT @@GLOBAL.slave_parallel_threads as 'must be zero because of default'; |
|||
--error ER_INCORRECT_GLOBAL_LOCAL_VAR |
|||
SELECT @@SESSION.slave_parallel_threads as 'no session var'; |
|||
|
|||
SET GLOBAL slave_parallel_threads= 0; |
|||
SET GLOBAL slave_parallel_threads= DEFAULT; |
|||
SET GLOBAL slave_parallel_threads= 10; |
|||
SELECT @@GLOBAL.slave_parallel_threads; |
|||
|
|||
SET GLOBAL slave_parallel_threads = @save_slave_parallel_threads; |
|||
543
sql/log_event.cc
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -0,0 +1,956 @@ |
|||
#include "my_global.h"
|
|||
#include "rpl_parallel.h"
|
|||
#include "slave.h"
|
|||
#include "rpl_mi.h"
|
|||
|
|||
|
|||
/*
|
|||
Code for optional parallel execution of replicated events on the slave. |
|||
|
|||
ToDo list: |
|||
|
|||
- Retry of failed transactions is not yet implemented for the parallel case. |
|||
|
|||
- All the waits (eg. in struct wait_for_commit and in |
|||
rpl_parallel_thread_pool::get_thread()) need to be killable. And on kill, |
|||
everything needs to be correctly rolled back and stopped in all threads, |
|||
to ensure a consistent slave replication state. |
|||
|
|||
- Handle the case of a partial event group. This occurs when the master |
|||
crashes in the middle of writing the event group to the binlog. The |
|||
slave rolls back the transaction; parallel execution needs to be able |
|||
to deal with this wrt. commit_orderer and such. |
|||
See Format_description_log_event::do_apply_event(). |
|||
*/ |
|||
|
|||
struct rpl_parallel_thread_pool global_rpl_thread_pool; |
|||
|
|||
|
|||
static int |
|||
rpt_handle_event(rpl_parallel_thread::queued_event *qev, |
|||
struct rpl_parallel_thread *rpt) |
|||
{ |
|||
int err __attribute__((unused)); |
|||
rpl_group_info *rgi= qev->rgi; |
|||
Relay_log_info *rli= rgi->rli; |
|||
THD *thd= rgi->thd; |
|||
|
|||
thd->rgi_slave= rgi; |
|||
thd->rpl_filter = rli->mi->rpl_filter; |
|||
|
|||
/* ToDo: Access to thd, and what about rli, split out a parallel part? */ |
|||
mysql_mutex_lock(&rli->data_lock); |
|||
qev->ev->thd= thd; |
|||
strcpy(rgi->event_relay_log_name_buf, qev->event_relay_log_name); |
|||
rgi->event_relay_log_name= rgi->event_relay_log_name_buf; |
|||
rgi->event_relay_log_pos= qev->event_relay_log_pos; |
|||
rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos; |
|||
strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name); |
|||
err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt); |
|||
thd->rgi_slave= NULL; |
|||
|
|||
thread_safe_increment64(&rli->executed_entries, |
|||
&slave_executed_entries_lock); |
|||
/* ToDo: error handling. */ |
|||
return err; |
|||
} |
|||
|
|||
|
|||
static void |
|||
handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) |
|||
{ |
|||
int cmp; |
|||
Relay_log_info *rli; |
|||
/*
|
|||
Events that are not part of an event group, such as Format Description, |
|||
Stop, GTID List and such, are executed directly in the driver SQL thread, |
|||
to keep the relay log state up-to-date. But the associated position update |
|||
is done here, in sync with other normal events as they are queued to |
|||
worker threads. |
|||
*/ |
|||
if ((thd->variables.option_bits & OPTION_BEGIN) && |
|||
opt_using_transactions) |
|||
return; |
|||
rli= qev->rgi->rli; |
|||
mysql_mutex_lock(&rli->data_lock); |
|||
cmp= strcmp(rli->group_relay_log_name, qev->event_relay_log_name); |
|||
if (cmp < 0) |
|||
{ |
|||
rli->group_relay_log_pos= qev->future_event_relay_log_pos; |
|||
strmake_buf(rli->group_relay_log_name, qev->event_relay_log_name); |
|||
rli->notify_group_relay_log_name_update(); |
|||
} else if (cmp == 0 && |
|||
rli->group_relay_log_pos < qev->future_event_relay_log_pos) |
|||
rli->group_relay_log_pos= qev->future_event_relay_log_pos; |
|||
|
|||
cmp= strcmp(rli->group_master_log_name, qev->future_event_master_log_name); |
|||
if (cmp < 0) |
|||
{ |
|||
strcpy(rli->group_master_log_name, qev->future_event_master_log_name); |
|||
rli->notify_group_master_log_name_update(); |
|||
rli->group_master_log_pos= qev->future_event_master_log_pos; |
|||
} |
|||
else if (cmp == 0 |
|||
&& rli->group_master_log_pos < qev->future_event_master_log_pos) |
|||
rli->group_master_log_pos= qev->future_event_master_log_pos; |
|||
mysql_mutex_unlock(&rli->data_lock); |
|||
mysql_cond_broadcast(&rli->data_cond); |
|||
} |
|||
|
|||
|
|||
static bool |
|||
sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group) |
|||
{ |
|||
if (!rgi->rli->abort_slave && !abort_loop) |
|||
return false; |
|||
|
|||
/*
|
|||
Do not abort in the middle of an event group that cannot be rolled back. |
|||
*/ |
|||
if ((thd->transaction.all.modified_non_trans_table || |
|||
(thd->variables.option_bits & OPTION_KEEP_LOG)) |
|||
&& in_event_group) |
|||
return false; |
|||
/* ToDo: should we add some timeout like in sql_slave_killed?
|
|||
if (rgi->last_event_start_time == 0) |
|||
rgi->last_event_start_time= my_time(0); |
|||
*/ |
|||
|
|||
return true; |
|||
} |
|||
|
|||
|
|||
pthread_handler_t |
|||
handle_rpl_parallel_thread(void *arg) |
|||
{ |
|||
THD *thd; |
|||
const char* old_msg; |
|||
struct rpl_parallel_thread::queued_event *events; |
|||
bool group_standalone= true; |
|||
bool in_event_group= false; |
|||
uint64 event_gtid_sub_id= 0; |
|||
int err; |
|||
|
|||
struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg; |
|||
|
|||
my_thread_init(); |
|||
thd = new THD; |
|||
thd->thread_stack = (char*)&thd; |
|||
mysql_mutex_lock(&LOCK_thread_count); |
|||
thd->thread_id= thd->variables.pseudo_thread_id= thread_id++; |
|||
threads.append(thd); |
|||
mysql_mutex_unlock(&LOCK_thread_count); |
|||
set_current_thd(thd); |
|||
pthread_detach_this_thread(); |
|||
thd->init_for_queries(); |
|||
thd->variables.binlog_annotate_row_events= 0; |
|||
init_thr_lock(); |
|||
thd->store_globals(); |
|||
thd->system_thread= SYSTEM_THREAD_SLAVE_SQL; |
|||
thd->security_ctx->skip_grants(); |
|||
thd->variables.max_allowed_packet= slave_max_allowed_packet; |
|||
thd->slave_thread= 1; |
|||
thd->enable_slow_log= opt_log_slow_slave_statements; |
|||
thd->variables.log_slow_filter= global_system_variables.log_slow_filter; |
|||
set_slave_thread_options(thd); |
|||
thd->client_capabilities = CLIENT_LOCAL_FILES; |
|||
thd_proc_info(thd, "Waiting for work from main SQL threads"); |
|||
thd->set_time(); |
|||
thd->variables.lock_wait_timeout= LONG_TIMEOUT; |
|||
|
|||
mysql_mutex_lock(&rpt->LOCK_rpl_thread); |
|||
rpt->thd= thd; |
|||
|
|||
while (rpt->delay_start) |
|||
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); |
|||
|
|||
rpt->running= true; |
|||
mysql_cond_signal(&rpt->COND_rpl_thread); |
|||
|
|||
while (!rpt->stop && !thd->killed) |
|||
{ |
|||
rpl_parallel_thread *list; |
|||
|
|||
old_msg= thd->proc_info; |
|||
thd->enter_cond(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread, |
|||
"Waiting for work from SQL thread"); |
|||
while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed) |
|||
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); |
|||
rpt->dequeue(events); |
|||
thd->exit_cond(old_msg); |
|||
mysql_cond_signal(&rpt->COND_rpl_thread); |
|||
|
|||
more_events: |
|||
while (events) |
|||
{ |
|||
struct rpl_parallel_thread::queued_event *next= events->next; |
|||
Log_event_type event_type; |
|||
rpl_group_info *rgi= events->rgi; |
|||
rpl_parallel_entry *entry= rgi->parallel_entry; |
|||
uint64 wait_for_sub_id; |
|||
uint64 wait_start_sub_id; |
|||
bool end_of_group; |
|||
|
|||
if (!events->ev) |
|||
{ |
|||
handle_queued_pos_update(thd, events); |
|||
my_free(events); |
|||
events= next; |
|||
continue; |
|||
} |
|||
|
|||
err= 0; |
|||
/* Handle a new event group, which will be initiated by a GTID event. */ |
|||
if ((event_type= events->ev->get_type_code()) == GTID_EVENT) |
|||
{ |
|||
in_event_group= true; |
|||
/*
|
|||
If the standalone flag is set, then this event group consists of a |
|||
single statement (possibly preceeded by some Intvar_log_event and |
|||
similar), without any terminating COMMIT/ROLLBACK/XID. |
|||
*/ |
|||
group_standalone= |
|||
(0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 & |
|||
Gtid_log_event::FL_STANDALONE)); |
|||
|
|||
/* Save this, as it gets cleared when the event group commits. */ |
|||
event_gtid_sub_id= rgi->gtid_sub_id; |
|||
|
|||
rgi->thd= thd; |
|||
|
|||
/*
|
|||
Register ourself to wait for the previous commit, if we need to do |
|||
such registration _and_ that previous commit has not already |
|||
occured. |
|||
|
|||
Also do not start parallel execution of this event group until all |
|||
prior groups have committed that are not safe to run in parallel with. |
|||
*/ |
|||
wait_for_sub_id= rgi->wait_commit_sub_id; |
|||
wait_start_sub_id= rgi->wait_start_sub_id; |
|||
if (wait_for_sub_id || wait_start_sub_id) |
|||
{ |
|||
mysql_mutex_lock(&entry->LOCK_parallel_entry); |
|||
if (wait_start_sub_id) |
|||
{ |
|||
while (wait_start_sub_id > entry->last_committed_sub_id) |
|||
mysql_cond_wait(&entry->COND_parallel_entry, |
|||
&entry->LOCK_parallel_entry); |
|||
} |
|||
rgi->wait_start_sub_id= 0; /* No need to check again. */ |
|||
if (wait_for_sub_id > entry->last_committed_sub_id) |
|||
{ |
|||
wait_for_commit *waitee= |
|||
&rgi->wait_commit_group_info->commit_orderer; |
|||
rgi->commit_orderer.register_wait_for_prior_commit(waitee); |
|||
} |
|||
mysql_mutex_unlock(&entry->LOCK_parallel_entry); |
|||
} |
|||
|
|||
if(thd->wait_for_commit_ptr) |
|||
{ |
|||
/*
|
|||
This indicates that we get a new GTID event in the middle of |
|||
a not completed event group. This is corrupt binlog (the master |
|||
will never write such binlog), so it does not happen unless |
|||
someone tries to inject wrong crafted binlog, but let us still |
|||
try to handle it somewhat nicely. |
|||
*/ |
|||
rgi->cleanup_context(thd, true); |
|||
thd->wait_for_commit_ptr->unregister_wait_for_prior_commit(); |
|||
thd->wait_for_commit_ptr->wakeup_subsequent_commits(err); |
|||
} |
|||
thd->wait_for_commit_ptr= &rgi->commit_orderer; |
|||
} |
|||
|
|||
/*
|
|||
If the SQL thread is stopping, we just skip execution of all the |
|||
following event groups. We still do all the normal waiting and wakeup |
|||
processing between the event groups as a simple way to ensure that |
|||
everything is stopped and cleaned up correctly. |
|||
*/ |
|||
if (!rgi->is_error && !sql_worker_killed(thd, rgi, in_event_group)) |
|||
err= rpt_handle_event(events, rpt); |
|||
else |
|||
err= thd->wait_for_prior_commit(); |
|||
|
|||
end_of_group= |
|||
in_event_group && |
|||
((group_standalone && !Log_event::is_part_of_group(event_type)) || |
|||
event_type == XID_EVENT || |
|||
(event_type == QUERY_EVENT && |
|||
(((Query_log_event *)events->ev)->is_commit() || |
|||
((Query_log_event *)events->ev)->is_rollback()))); |
|||
|
|||
delete_or_keep_event_post_apply(rgi, event_type, events->ev); |
|||
my_free(events); |
|||
|
|||
if (err) |
|||
{ |
|||
rgi->is_error= true; |
|||
slave_output_error_info(rgi->rli, thd); |
|||
rgi->cleanup_context(thd, true); |
|||
rgi->rli->abort_slave= true; |
|||
} |
|||
if (end_of_group) |
|||
{ |
|||
in_event_group= false; |
|||
|
|||
/*
|
|||
Remove any left-over registration to wait for a prior commit to |
|||
complete. Normally, such wait would already have been removed at |
|||
this point by wait_for_prior_commit(), but eg. in error case we |
|||
might have skipped waiting, so we would need to remove it explicitly. |
|||
*/ |
|||
rgi->commit_orderer.unregister_wait_for_prior_commit(); |
|||
thd->wait_for_commit_ptr= NULL; |
|||
|
|||
/*
|
|||
Record that this event group has finished (eg. transaction is |
|||
committed, if transactional), so other event groups will no longer |
|||
attempt to wait for us to commit. Once we have increased |
|||
entry->last_committed_sub_id, no other threads will execute |
|||
register_wait_for_prior_commit() against us. Thus, by doing one |
|||
extra (usually redundant) wakeup_subsequent_commits() we can ensure |
|||
that no register_wait_for_prior_commit() can ever happen without a |
|||
subsequent wakeup_subsequent_commits() to wake it up. |
|||
|
|||
We can race here with the next transactions, but that is fine, as |
|||
long as we check that we do not decrease last_committed_sub_id. If |
|||
this commit is done, then any prior commits will also have been |
|||
done and also no longer need waiting for. |
|||
*/ |
|||
mysql_mutex_lock(&entry->LOCK_parallel_entry); |
|||
if (entry->last_committed_sub_id < event_gtid_sub_id) |
|||
{ |
|||
entry->last_committed_sub_id= event_gtid_sub_id; |
|||
mysql_cond_broadcast(&entry->COND_parallel_entry); |
|||
} |
|||
mysql_mutex_unlock(&entry->LOCK_parallel_entry); |
|||
|
|||
rgi->commit_orderer.wakeup_subsequent_commits(err); |
|||
delete rgi; |
|||
} |
|||
|
|||
events= next; |
|||
} |
|||
|
|||
mysql_mutex_lock(&rpt->LOCK_rpl_thread); |
|||
if ((events= rpt->event_queue) != NULL) |
|||
{ |
|||
/*
|
|||
Take next group of events from the replication pool. |
|||
This is faster than having to wakeup the pool manager thread to give us |
|||
a new event. |
|||
*/ |
|||
rpt->dequeue(events); |
|||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread); |
|||
mysql_cond_signal(&rpt->COND_rpl_thread); |
|||
goto more_events; |
|||
} |
|||
|
|||
if (!in_event_group) |
|||
{ |
|||
rpt->current_entry= NULL; |
|||
if (!rpt->stop) |
|||
{ |
|||
mysql_mutex_lock(&rpt->pool->LOCK_rpl_thread_pool); |
|||
list= rpt->pool->free_list; |
|||
rpt->next= list; |
|||
rpt->pool->free_list= rpt; |
|||
if (!list) |
|||
mysql_cond_broadcast(&rpt->pool->COND_rpl_thread_pool); |
|||
mysql_mutex_unlock(&rpt->pool->LOCK_rpl_thread_pool); |
|||
} |
|||
} |
|||
} |
|||
|
|||
rpt->thd= NULL; |
|||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread); |
|||
|
|||
thd->clear_error(); |
|||
thd->catalog= 0; |
|||
thd->reset_query(); |
|||
thd->reset_db(NULL, 0); |
|||
thd_proc_info(thd, "Slave worker thread exiting"); |
|||
thd->temporary_tables= 0; |
|||
mysql_mutex_lock(&LOCK_thread_count); |
|||
THD_CHECK_SENTRY(thd); |
|||
delete thd; |
|||
mysql_mutex_unlock(&LOCK_thread_count); |
|||
|
|||
mysql_mutex_lock(&rpt->LOCK_rpl_thread); |
|||
rpt->running= false; |
|||
mysql_cond_signal(&rpt->COND_rpl_thread); |
|||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread); |
|||
|
|||
my_thread_end(); |
|||
|
|||
return NULL; |
|||
} |
|||
|
|||
|
|||
int |
|||
rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, |
|||
uint32 new_count, bool skip_check) |
|||
{ |
|||
uint32 i; |
|||
rpl_parallel_thread **new_list= NULL; |
|||
rpl_parallel_thread *new_free_list= NULL; |
|||
rpl_parallel_thread *rpt_array= NULL; |
|||
|
|||
/*
|
|||
Allocate the new list of threads up-front. |
|||
That way, if we fail half-way, we only need to free whatever we managed |
|||
to allocate, and will not be left with a half-functional thread pool. |
|||
*/ |
|||
if (new_count && |
|||
!my_multi_malloc(MYF(MY_WME|MY_ZEROFILL), |
|||
&new_list, new_count*sizeof(*new_list), |
|||
&rpt_array, new_count*sizeof(*rpt_array), |
|||
NULL)) |
|||
{ |
|||
my_error(ER_OUTOFMEMORY, MYF(0), (int(new_count*sizeof(*new_list) + |
|||
new_count*sizeof(*rpt_array)))); |
|||
goto err;; |
|||
} |
|||
|
|||
for (i= 0; i < new_count; ++i) |
|||
{ |
|||
pthread_t th; |
|||
|
|||
new_list[i]= &rpt_array[i]; |
|||
new_list[i]->delay_start= true; |
|||
mysql_mutex_init(key_LOCK_rpl_thread, &new_list[i]->LOCK_rpl_thread, |
|||
MY_MUTEX_INIT_SLOW); |
|||
mysql_cond_init(key_COND_rpl_thread, &new_list[i]->COND_rpl_thread, NULL); |
|||
new_list[i]->pool= pool; |
|||
if (mysql_thread_create(key_rpl_parallel_thread, &th, NULL, |
|||
handle_rpl_parallel_thread, new_list[i])) |
|||
{ |
|||
my_error(ER_OUT_OF_RESOURCES, MYF(0)); |
|||
goto err; |
|||
} |
|||
new_list[i]->next= new_free_list; |
|||
new_free_list= new_list[i]; |
|||
} |
|||
|
|||
if (!skip_check) |
|||
{ |
|||
mysql_mutex_lock(&LOCK_active_mi); |
|||
if (master_info_index->give_error_if_slave_running()) |
|||
{ |
|||
mysql_mutex_unlock(&LOCK_active_mi); |
|||
goto err; |
|||
} |
|||
if (pool->changing) |
|||
{ |
|||
mysql_mutex_unlock(&LOCK_active_mi); |
|||
my_error(ER_CHANGE_SLAVE_PARALLEL_THREADS_ACTIVE, MYF(0)); |
|||
goto err; |
|||
} |
|||
pool->changing= true; |
|||
mysql_mutex_unlock(&LOCK_active_mi); |
|||
} |
|||
|
|||
/*
|
|||
Grab each old thread in turn, and signal it to stop. |
|||
|
|||
Note that since we require all replication threads to be stopped before |
|||
changing the parallel replication worker thread pool, all the threads will |
|||
be already idle and will terminate immediately. |
|||
*/ |
|||
for (i= 0; i < pool->count; ++i) |
|||
{ |
|||
rpl_parallel_thread *rpt= pool->get_thread(NULL); |
|||
rpt->stop= true; |
|||
mysql_cond_signal(&rpt->COND_rpl_thread); |
|||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread); |
|||
} |
|||
|
|||
for (i= 0; i < pool->count; ++i) |
|||
{ |
|||
rpl_parallel_thread *rpt= pool->threads[i]; |
|||
mysql_mutex_lock(&rpt->LOCK_rpl_thread); |
|||
while (rpt->running) |
|||
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); |
|||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread); |
|||
mysql_mutex_destroy(&rpt->LOCK_rpl_thread); |
|||
mysql_cond_destroy(&rpt->COND_rpl_thread); |
|||
} |
|||
|
|||
my_free(pool->threads); |
|||
pool->threads= new_list; |
|||
pool->free_list= new_free_list; |
|||
pool->count= new_count; |
|||
for (i= 0; i < pool->count; ++i) |
|||
{ |
|||
mysql_mutex_lock(&pool->threads[i]->LOCK_rpl_thread); |
|||
pool->threads[i]->delay_start= false; |
|||
mysql_cond_signal(&pool->threads[i]->COND_rpl_thread); |
|||
while (!pool->threads[i]->running) |
|||
mysql_cond_wait(&pool->threads[i]->COND_rpl_thread, |
|||
&pool->threads[i]->LOCK_rpl_thread); |
|||
mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread); |
|||
} |
|||
|
|||
if (!skip_check) |
|||
{ |
|||
mysql_mutex_lock(&LOCK_active_mi); |
|||
pool->changing= false; |
|||
mysql_mutex_unlock(&LOCK_active_mi); |
|||
} |
|||
return 0; |
|||
|
|||
err: |
|||
if (new_list) |
|||
{ |
|||
while (new_free_list) |
|||
{ |
|||
mysql_mutex_lock(&new_free_list->LOCK_rpl_thread); |
|||
new_free_list->delay_start= false; |
|||
new_free_list->stop= true; |
|||
mysql_cond_signal(&new_free_list->COND_rpl_thread); |
|||
while (!new_free_list->running) |
|||
mysql_cond_wait(&new_free_list->COND_rpl_thread, |
|||
&new_free_list->LOCK_rpl_thread); |
|||
while (new_free_list->running) |
|||
mysql_cond_wait(&new_free_list->COND_rpl_thread, |
|||
&new_free_list->LOCK_rpl_thread); |
|||
mysql_mutex_unlock(&new_free_list->LOCK_rpl_thread); |
|||
new_free_list= new_free_list->next; |
|||
} |
|||
my_free(new_list); |
|||
} |
|||
if (!skip_check) |
|||
{ |
|||
mysql_mutex_lock(&LOCK_active_mi); |
|||
pool->changing= false; |
|||
mysql_mutex_unlock(&LOCK_active_mi); |
|||
} |
|||
return 1; |
|||
} |
|||
|
|||
|
|||
rpl_parallel_thread_pool::rpl_parallel_thread_pool() |
|||
: count(0), threads(0), free_list(0), changing(false), inited(false) |
|||
{ |
|||
} |
|||
|
|||
|
|||
int |
|||
rpl_parallel_thread_pool::init(uint32 size) |
|||
{ |
|||
count= 0; |
|||
threads= NULL; |
|||
free_list= NULL; |
|||
|
|||
mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool, |
|||
MY_MUTEX_INIT_SLOW); |
|||
mysql_cond_init(key_COND_rpl_thread_pool, &COND_rpl_thread_pool, NULL); |
|||
changing= false; |
|||
inited= true; |
|||
|
|||
return rpl_parallel_change_thread_count(this, size, true); |
|||
} |
|||
|
|||
|
|||
void |
|||
rpl_parallel_thread_pool::destroy() |
|||
{ |
|||
if (!inited) |
|||
return; |
|||
rpl_parallel_change_thread_count(this, 0, true); |
|||
mysql_mutex_destroy(&LOCK_rpl_thread_pool); |
|||
mysql_cond_destroy(&COND_rpl_thread_pool); |
|||
inited= false; |
|||
} |
|||
|
|||
|
|||
/*
|
|||
Wait for a worker thread to become idle. When one does, grab the thread for |
|||
our use and return it. |
|||
|
|||
Note that we return with the worker threads's LOCK_rpl_thread mutex locked. |
|||
*/ |
|||
struct rpl_parallel_thread * |
|||
rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry) |
|||
{ |
|||
rpl_parallel_thread *rpt; |
|||
|
|||
mysql_mutex_lock(&LOCK_rpl_thread_pool); |
|||
while ((rpt= free_list) == NULL) |
|||
mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool); |
|||
free_list= rpt->next; |
|||
mysql_mutex_unlock(&LOCK_rpl_thread_pool); |
|||
mysql_mutex_lock(&rpt->LOCK_rpl_thread); |
|||
rpt->current_entry= entry; |
|||
|
|||
return rpt; |
|||
} |
|||
|
|||
|
|||
static void |
|||
free_rpl_parallel_entry(void *element) |
|||
{ |
|||
rpl_parallel_entry *e= (rpl_parallel_entry *)element; |
|||
mysql_cond_destroy(&e->COND_parallel_entry); |
|||
mysql_mutex_destroy(&e->LOCK_parallel_entry); |
|||
my_free(e); |
|||
} |
|||
|
|||
|
|||
rpl_parallel::rpl_parallel() : |
|||
current(NULL), sql_thread_stopping(false) |
|||
{ |
|||
my_hash_init(&domain_hash, &my_charset_bin, 32, |
|||
offsetof(rpl_parallel_entry, domain_id), sizeof(uint32), |
|||
NULL, free_rpl_parallel_entry, HASH_UNIQUE); |
|||
} |
|||
|
|||
|
|||
void |
|||
rpl_parallel::reset() |
|||
{ |
|||
my_hash_reset(&domain_hash); |
|||
current= NULL; |
|||
sql_thread_stopping= false; |
|||
} |
|||
|
|||
|
|||
rpl_parallel::~rpl_parallel() |
|||
{ |
|||
my_hash_free(&domain_hash); |
|||
} |
|||
|
|||
|
|||
rpl_parallel_entry * |
|||
rpl_parallel::find(uint32 domain_id) |
|||
{ |
|||
struct rpl_parallel_entry *e; |
|||
|
|||
if (!(e= (rpl_parallel_entry *)my_hash_search(&domain_hash, |
|||
(const uchar *)&domain_id, 0))) |
|||
{ |
|||
/* Allocate a new, empty one. */ |
|||
if (!(e= (struct rpl_parallel_entry *)my_malloc(sizeof(*e), |
|||
MYF(MY_ZEROFILL)))) |
|||
return NULL; |
|||
e->domain_id= domain_id; |
|||
if (my_hash_insert(&domain_hash, (uchar *)e)) |
|||
{ |
|||
my_free(e); |
|||
return NULL; |
|||
} |
|||
mysql_mutex_init(key_LOCK_parallel_entry, &e->LOCK_parallel_entry, |
|||
MY_MUTEX_INIT_FAST); |
|||
mysql_cond_init(key_COND_parallel_entry, &e->COND_parallel_entry, NULL); |
|||
} |
|||
|
|||
return e; |
|||
} |
|||
|
|||
|
|||
void |
|||
rpl_parallel::wait_for_done() |
|||
{ |
|||
struct rpl_parallel_entry *e; |
|||
uint32 i; |
|||
|
|||
for (i= 0; i < domain_hash.records; ++i) |
|||
{ |
|||
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); |
|||
mysql_mutex_lock(&e->LOCK_parallel_entry); |
|||
while (e->current_sub_id > e->last_committed_sub_id) |
|||
mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); |
|||
mysql_mutex_unlock(&e->LOCK_parallel_entry); |
|||
} |
|||
} |
|||
|
|||
|
|||
/*
|
|||
do_event() is executed by the sql_driver_thd thread. |
|||
It's main purpose is to find a thread that can execute the query. |
|||
|
|||
@retval false ok, event was accepted |
|||
@retval true error |
|||
*/ |
|||
|
|||
bool |
|||
rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, |
|||
ulonglong event_size) |
|||
{ |
|||
rpl_parallel_entry *e; |
|||
rpl_parallel_thread *cur_thread; |
|||
rpl_parallel_thread::queued_event *qev; |
|||
rpl_group_info *rgi= NULL; |
|||
Relay_log_info *rli= serial_rgi->rli; |
|||
enum Log_event_type typ; |
|||
bool is_group_event; |
|||
|
|||
/* ToDo: what to do with this lock?!? */ |
|||
mysql_mutex_unlock(&rli->data_lock); |
|||
|
|||
/*
|
|||
Stop queueing additional event groups once the SQL thread is requested to |
|||
stop. |
|||
*/ |
|||
if (((typ= ev->get_type_code()) == GTID_EVENT || |
|||
!(is_group_event= Log_event::is_group_event(typ))) && |
|||
rli->abort_slave) |
|||
sql_thread_stopping= true; |
|||
if (sql_thread_stopping) |
|||
{ |
|||
/* QQ: Need a better comment why we return false here */ |
|||
return false; |
|||
} |
|||
|
|||
if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev), |
|||
MYF(0)))) |
|||
{ |
|||
my_error(ER_OUT_OF_RESOURCES, MYF(0)); |
|||
return true; |
|||
} |
|||
qev->ev= ev; |
|||
qev->event_size= event_size; |
|||
qev->next= NULL; |
|||
strcpy(qev->event_relay_log_name, rli->event_relay_log_name); |
|||
qev->event_relay_log_pos= rli->event_relay_log_pos; |
|||
qev->future_event_relay_log_pos= rli->future_event_relay_log_pos; |
|||
strcpy(qev->future_event_master_log_name, rli->future_event_master_log_name); |
|||
|
|||
if (typ == GTID_EVENT) |
|||
{ |
|||
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); |
|||
uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ? |
|||
0 : gtid_ev->domain_id); |
|||
|
|||
if (!(e= find(domain_id)) || |
|||
!(rgi= new rpl_group_info(rli)) || |
|||
event_group_new_gtid(rgi, gtid_ev)) |
|||
{ |
|||
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); |
|||
delete rgi; |
|||
return true; |
|||
} |
|||
rgi->is_parallel_exec = true; |
|||
if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on())) |
|||
rgi->deferred_events= new Deferred_log_events(rli); |
|||
|
|||
if ((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) && |
|||
e->last_commit_id == gtid_ev->commit_id) |
|||
{ |
|||
/*
|
|||
We are already executing something else in this domain. But the two |
|||
event groups were committed together in the same group commit on the |
|||
master, so we can still do them in parallel here on the slave. |
|||
|
|||
However, the commit of this event must wait for the commit of the prior |
|||
event, to preserve binlog commit order and visibility across all |
|||
servers in the replication hierarchy. |
|||
*/ |
|||
rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e); |
|||
rgi->wait_commit_sub_id= e->current_sub_id; |
|||
rgi->wait_commit_group_info= e->current_group_info; |
|||
rgi->wait_start_sub_id= e->prev_groupcommit_sub_id; |
|||
e->rpl_thread= cur_thread= rpt; |
|||
/* get_thread() returns with the LOCK_rpl_thread locked. */ |
|||
} |
|||
else |
|||
{ |
|||
/*
|
|||
Check if we already have a worker thread for this entry. |
|||
|
|||
We continue to queue more events up for the worker thread while it is |
|||
still executing the first ones, to be able to start executing a large |
|||
event group without having to wait for the end to be fetched from the |
|||
master. And we continue to queue up more events after the first group, |
|||
so that we can continue to process subsequent parts of the relay log in |
|||
parallel without having to wait for previous long-running events to |
|||
complete. |
|||
|
|||
But if the worker thread is idle at any point, it may return to the |
|||
idle list or start servicing a different request. So check this, and |
|||
allocate a new thread if the old one is no longer processing for us. |
|||
*/ |
|||
cur_thread= e->rpl_thread; |
|||
if (cur_thread) |
|||
{ |
|||
mysql_mutex_lock(&cur_thread->LOCK_rpl_thread); |
|||
for (;;) |
|||
{ |
|||
if (cur_thread->current_entry != e) |
|||
{ |
|||
/*
|
|||
The worker thread became idle, and returned to the free list and |
|||
possibly was allocated to a different request. This also means |
|||
that everything previously queued has already been executed, |
|||
else the worker thread would not have become idle. So we should |
|||
allocate a new worker thread. |
|||
*/ |
|||
mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); |
|||
e->rpl_thread= cur_thread= NULL; |
|||
break; |
|||
} |
|||
else if (cur_thread->queued_size <= opt_slave_parallel_max_queued) |
|||
break; // The thread is ready to queue into
|
|||
else |
|||
{ |
|||
/*
|
|||
We have reached the limit of how much memory we are allowed to |
|||
use for queuing events, so wait for the thread to consume some |
|||
of its queue. |
|||
*/ |
|||
mysql_cond_wait(&cur_thread->COND_rpl_thread, |
|||
&cur_thread->LOCK_rpl_thread); |
|||
} |
|||
} |
|||
} |
|||
|
|||
if (!cur_thread) |
|||
{ |
|||
/*
|
|||
Nothing else is currently running in this domain. We can |
|||
spawn a new thread to do this event group in parallel with |
|||
anything else that might be running in other domains. |
|||
*/ |
|||
cur_thread= e->rpl_thread= global_rpl_thread_pool.get_thread(e); |
|||
/* get_thread() returns with the LOCK_rpl_thread locked. */ |
|||
} |
|||
else |
|||
{ |
|||
/*
|
|||
We are still executing the previous event group for this replication |
|||
domain, and we have to wait for that to finish before we can start on |
|||
the next one. So just re-use the thread. |
|||
*/ |
|||
} |
|||
|
|||
rgi->wait_commit_sub_id= 0; |
|||
rgi->wait_start_sub_id= 0; |
|||
e->prev_groupcommit_sub_id= e->current_sub_id; |
|||
} |
|||
|
|||
if (gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) |
|||
{ |
|||
e->last_server_id= gtid_ev->server_id; |
|||
e->last_seq_no= gtid_ev->seq_no; |
|||
e->last_commit_id= gtid_ev->commit_id; |
|||
} |
|||
else |
|||
{ |
|||
e->last_server_id= 0; |
|||
e->last_seq_no= 0; |
|||
e->last_commit_id= 0; |
|||
} |
|||
|
|||
qev->rgi= e->current_group_info= rgi; |
|||
e->current_sub_id= rgi->gtid_sub_id; |
|||
current= rgi->parallel_entry= e; |
|||
} |
|||
else if (!is_group_event || !current) |
|||
{ |
|||
my_off_t log_pos; |
|||
int err; |
|||
bool tmp; |
|||
/*
|
|||
Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread. |
|||
Same for events not preceeded by GTID (we should not see those normally, |
|||
but they might be from an old master). |
|||
|
|||
The varuable `current' is NULL for the case where the master did not |
|||
have GTID, like a MariaDB 5.5 or MySQL master. |
|||
*/ |
|||
qev->rgi= serial_rgi; |
|||
/* Handle master log name change, seen in Rotate_log_event. */ |
|||
if (typ == ROTATE_EVENT) |
|||
{ |
|||
Rotate_log_event *rev= static_cast<Rotate_log_event *>(qev->ev); |
|||
if ((rev->server_id != global_system_variables.server_id || |
|||
rli->replicate_same_server_id) && |
|||
!rev->is_relay_log_event() && |
|||
!rli->is_in_group()) |
|||
{ |
|||
memcpy(rli->future_event_master_log_name, |
|||
rev->new_log_ident, rev->ident_len+1); |
|||
} |
|||
} |
|||
|
|||
tmp= serial_rgi->is_parallel_exec; |
|||
serial_rgi->is_parallel_exec= true; |
|||
err= rpt_handle_event(qev, NULL); |
|||
serial_rgi->is_parallel_exec= tmp; |
|||
log_pos= qev->ev->log_pos; |
|||
delete_or_keep_event_post_apply(serial_rgi, typ, qev->ev); |
|||
|
|||
if (err) |
|||
{ |
|||
my_free(qev); |
|||
return true; |
|||
} |
|||
qev->ev= NULL; |
|||
qev->future_event_master_log_pos= log_pos; |
|||
if (!current) |
|||
{ |
|||
handle_queued_pos_update(rli->sql_driver_thd, qev); |
|||
my_free(qev); |
|||
return false; |
|||
} |
|||
/*
|
|||
Queue an empty event, so that the position will be updated in a |
|||
reasonable way relative to other events: |
|||
|
|||
- If the currently executing events are queued serially for a single |
|||
thread, the position will only be updated when everything before has |
|||
completed. |
|||
|
|||
- If we are executing multiple independent events in parallel, then at |
|||
least the position will not be updated until one of them has reached |
|||
the current point. |
|||
*/ |
|||
cur_thread= current->rpl_thread; |
|||
if (cur_thread) |
|||
{ |
|||
mysql_mutex_lock(&cur_thread->LOCK_rpl_thread); |
|||
if (cur_thread->current_entry != current) |
|||
{ |
|||
/* Not ours anymore, we need to grab a new one. */ |
|||
mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); |
|||
cur_thread= NULL; |
|||
} |
|||
} |
|||
if (!cur_thread) |
|||
cur_thread= current->rpl_thread= |
|||
global_rpl_thread_pool.get_thread(current); |
|||
} |
|||
else |
|||
{ |
|||
cur_thread= current->rpl_thread; |
|||
if (cur_thread) |
|||
{ |
|||
mysql_mutex_lock(&cur_thread->LOCK_rpl_thread); |
|||
if (cur_thread->current_entry != current) |
|||
{ |
|||
/* Not ours anymore, we need to grab a new one. */ |
|||
mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); |
|||
cur_thread= NULL; |
|||
} |
|||
} |
|||
if (!cur_thread) |
|||
{ |
|||
cur_thread= current->rpl_thread= |
|||
global_rpl_thread_pool.get_thread(current); |
|||
} |
|||
qev->rgi= current->current_group_info; |
|||
} |
|||
|
|||
/*
|
|||
Queue the event for processing. |
|||
*/ |
|||
rli->event_relay_log_pos= rli->future_event_relay_log_pos; |
|||
cur_thread->enqueue(qev); |
|||
mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); |
|||
mysql_cond_signal(&cur_thread->COND_rpl_thread); |
|||
|
|||
return false; |
|||
} |
|||
@ -0,0 +1,125 @@ |
|||
#ifndef RPL_PARALLEL_H |
|||
#define RPL_PARALLEL_H |
|||
|
|||
#include "log_event.h" |
|||
|
|||
|
|||
struct rpl_parallel; |
|||
struct rpl_parallel_entry; |
|||
struct rpl_parallel_thread_pool; |
|||
|
|||
class Relay_log_info; |
|||
struct rpl_parallel_thread { |
|||
bool delay_start; |
|||
bool running; |
|||
bool stop; |
|||
mysql_mutex_t LOCK_rpl_thread; |
|||
mysql_cond_t COND_rpl_thread; |
|||
struct rpl_parallel_thread *next; /* For free list. */ |
|||
struct rpl_parallel_thread_pool *pool; |
|||
THD *thd; |
|||
struct rpl_parallel_entry *current_entry; |
|||
struct queued_event { |
|||
queued_event *next; |
|||
Log_event *ev; |
|||
rpl_group_info *rgi; |
|||
ulonglong future_event_relay_log_pos; |
|||
char event_relay_log_name[FN_REFLEN]; |
|||
char future_event_master_log_name[FN_REFLEN]; |
|||
ulonglong event_relay_log_pos; |
|||
my_off_t future_event_master_log_pos; |
|||
size_t event_size; |
|||
} *event_queue, *last_in_queue; |
|||
uint64 queued_size; |
|||
|
|||
void enqueue(queued_event *qev) |
|||
{ |
|||
if (last_in_queue) |
|||
last_in_queue->next= qev; |
|||
else |
|||
event_queue= qev; |
|||
last_in_queue= qev; |
|||
queued_size+= qev->event_size; |
|||
} |
|||
|
|||
void dequeue(queued_event *list) |
|||
{ |
|||
queued_event *tmp; |
|||
|
|||
DBUG_ASSERT(list == event_queue); |
|||
event_queue= last_in_queue= NULL; |
|||
for (tmp= list; tmp; tmp= tmp->next) |
|||
queued_size-= tmp->event_size; |
|||
} |
|||
}; |
|||
|
|||
|
|||
struct rpl_parallel_thread_pool { |
|||
uint32 count; |
|||
struct rpl_parallel_thread **threads; |
|||
struct rpl_parallel_thread *free_list; |
|||
mysql_mutex_t LOCK_rpl_thread_pool; |
|||
mysql_cond_t COND_rpl_thread_pool; |
|||
bool changing; |
|||
bool inited; |
|||
|
|||
rpl_parallel_thread_pool(); |
|||
int init(uint32 size); |
|||
void destroy(); |
|||
struct rpl_parallel_thread *get_thread(rpl_parallel_entry *entry); |
|||
}; |
|||
|
|||
|
|||
struct rpl_parallel_entry { |
|||
uint32 domain_id; |
|||
uint32 last_server_id; |
|||
uint64 last_seq_no; |
|||
uint64 last_commit_id; |
|||
bool active; |
|||
rpl_parallel_thread *rpl_thread; |
|||
/* |
|||
The sub_id of the last transaction to commit within this domain_id. |
|||
Must be accessed under LOCK_parallel_entry protection. |
|||
*/ |
|||
uint64 last_committed_sub_id; |
|||
mysql_mutex_t LOCK_parallel_entry; |
|||
mysql_cond_t COND_parallel_entry; |
|||
/* |
|||
The sub_id of the last event group in this replication domain that was |
|||
queued for execution by a worker thread. |
|||
*/ |
|||
uint64 current_sub_id; |
|||
rpl_group_info *current_group_info; |
|||
/* |
|||
The sub_id of the last event group in the previous batch of group-committed |
|||
transactions. |
|||
|
|||
When we spawn parallel worker threads for the next group-committed batch, |
|||
they first need to wait for this sub_id to be committed before it is safe |
|||
to start executing them. |
|||
*/ |
|||
uint64 prev_groupcommit_sub_id; |
|||
}; |
|||
struct rpl_parallel { |
|||
HASH domain_hash; |
|||
rpl_parallel_entry *current; |
|||
bool sql_thread_stopping; |
|||
|
|||
rpl_parallel(); |
|||
~rpl_parallel(); |
|||
void reset(); |
|||
rpl_parallel_entry *find(uint32 domain_id); |
|||
void wait_for_done(); |
|||
bool do_event(rpl_group_info *serial_rgi, Log_event *ev, |
|||
ulonglong event_size); |
|||
}; |
|||
|
|||
|
|||
extern struct rpl_parallel_thread_pool global_rpl_thread_pool; |
|||
|
|||
|
|||
extern int rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, |
|||
uint32 new_count, |
|||
bool skip_check= false); |
|||
|
|||
#endif /* RPL_PARALLEL_H */ |
|||
687
sql/slave.cc
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
Write
Preview
Loading…
Cancel
Save
Reference in new issue