From 15f46d517435f3570e2c788349637a06d818a619 Mon Sep 17 00:00:00 2001 From: Sachin Setiya Date: Mon, 23 Jan 2017 22:27:42 +0530 Subject: [PATCH] MDEV-7409 On RBR, extend the PROCESSLIST info to include at least the name of the recently used table When RBR is used, add the db name to db Field and table name to Status Field of the "SHOW FULL PROCESSLIST" command for SQL thread. --- mysql-test/suite/rpl/r/rpl_rbr_monitor.result | 62 ++++++++++++++ mysql-test/suite/rpl/t/rpl_rbr_monitor.test | 79 ++++++++++++++++++ sql/log_event.cc | 83 ++++++++++++++----- 3 files changed, 204 insertions(+), 20 deletions(-) create mode 100644 mysql-test/suite/rpl/r/rpl_rbr_monitor.result create mode 100644 mysql-test/suite/rpl/t/rpl_rbr_monitor.test diff --git a/mysql-test/suite/rpl/r/rpl_rbr_monitor.result b/mysql-test/suite/rpl/r/rpl_rbr_monitor.result new file mode 100644 index 00000000000..0aa9e7807c1 --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_rbr_monitor.result @@ -0,0 +1,62 @@ +include/master-slave.inc +[connection master] +connection master; +create table t1(a int primary key); +connection slave; +SET GLOBAL debug_dbug="+d,should_sleep_for_mdev7409"; +select * from t1; +a +connection master; +insert into t1(a) values(1); +#monitoring write rows +connection slave; +SELECT db , state FROM INFORMATION_SCHEMA.PROCESSLIST +WHERE DB = 'test' AND STATE LIKE "Write_rows_log_event::write_row(%) on table t1"; +db state +test Write_rows_log_event::write_row(-1) on table t1 +#monitoring update rows +connection master; +update t1 set a = a + 4194304 ; +connection slave; +SELECT db, state FROM INFORMATION_SCHEMA.PROCESSLIST +WHERE DB = 'test' AND STATE LIKE "Update_rows_log_event::find_row(%) on table t1"; +db state +test Update_rows_log_event::find_row(-1) on table t1 +SELECT db, state FROM INFORMATION_SCHEMA.PROCESSLIST +WHERE DB = 'test' AND STATE LIKE "Update_rows_log_event::unpack_current_row(%) on table t1"; +db state +test Update_rows_log_event::unpack_current_row(-1) on table t1 +SELECT db, state FROM INFORMATION_SCHEMA.PROCESSLIST +WHERE DB = 'test' AND STATE LIKE "Update_rows_log_event::ha_update_row(%) on table t1"; +db state +test Update_rows_log_event::ha_update_row(-1) on table t1 +#monitoring delete rows +connection master; +delete from t1 where a>1; +connection slave; +SELECT db , state FROM INFORMATION_SCHEMA.PROCESSLIST +WHERE DB = 'test' AND STATE LIKE "Delete_rows_log_event::find_row(%) on table t1"; +db state +test Delete_rows_log_event::find_row(-1) on table t1 +SELECT db, state FROM INFORMATION_SCHEMA.PROCESSLIST +WHERE DB = 'test' AND STATE LIKE "Delete_rows_log_event::ha_delete_row(%) on table t1"; +db state +test Delete_rows_log_event::ha_delete_row(-1) on table t1 +connection master; +drop table t1; +connection slave; +SET GLOBAL debug_dbug=""; +include/rpl_end.inc +connection server_2; +connection server_2; +connection server_2; +connection server_2; +connection server_1; +connection server_1; +connection server_1; +connection server_2; +connection server_1; +connection server_2; +connection server_2; +connection server_1; +connection server_1; diff --git a/mysql-test/suite/rpl/t/rpl_rbr_monitor.test b/mysql-test/suite/rpl/t/rpl_rbr_monitor.test new file mode 100644 index 00000000000..becd6599124 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_rbr_monitor.test @@ -0,0 +1,79 @@ +--source include/have_innodb.inc +--source include/have_binlog_format_row.inc +--source include/master-slave.inc +--enable_connect_log + +--connection master +create table t1(a int primary key); +--save_master_pos + +--connection slave +--sync_with_master +SET GLOBAL debug_dbug="+d,should_sleep_for_mdev7409"; +select * from t1; + +--connection master +insert into t1(a) values(1); +--save_master_pos + +--echo #monitoring write rows +--connection slave + +let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST + WHERE DB = 'test' AND STATE LIKE "Write_rows_log_event::write_row(%) on table t1"; +--source include/wait_condition.inc +SELECT db , state FROM INFORMATION_SCHEMA.PROCESSLIST + WHERE DB = 'test' AND STATE LIKE "Write_rows_log_event::write_row(%) on table t1"; +--sync_with_master + +--echo #monitoring update rows +--connection master +update t1 set a = a + 4194304 ; + +--connection slave +let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST + WHERE DB = 'test' AND STATE LIKE "Update_rows_log_event::find_row(%) on table t1"; +--source include/wait_condition.inc +SELECT db, state FROM INFORMATION_SCHEMA.PROCESSLIST + WHERE DB = 'test' AND STATE LIKE "Update_rows_log_event::find_row(%) on table t1"; + +let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST + WHERE DB = 'test' AND STATE LIKE "Update_rows_log_event::unpack_current_row(%) on table t1"; +--source include/wait_condition.inc +SELECT db, state FROM INFORMATION_SCHEMA.PROCESSLIST + WHERE DB = 'test' AND STATE LIKE "Update_rows_log_event::unpack_current_row(%) on table t1"; + +let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST + WHERE DB = 'test' AND STATE LIKE "Update_rows_log_event::ha_update_row(%) on table t1"; +--source include/wait_condition.inc +SELECT db, state FROM INFORMATION_SCHEMA.PROCESSLIST + WHERE DB = 'test' AND STATE LIKE "Update_rows_log_event::ha_update_row(%) on table t1"; + +--source include/wait_condition.inc +--sync_with_master + +--echo #monitoring delete rows +--connection master +delete from t1 where a>1; + +--connection slave +let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST + WHERE DB = 'test' AND STATE LIKE "Delete_rows_log_event::find_row(%) on table t1"; +--source include/wait_condition.inc +SELECT db , state FROM INFORMATION_SCHEMA.PROCESSLIST + WHERE DB = 'test' AND STATE LIKE "Delete_rows_log_event::find_row(%) on table t1"; + +let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST + WHERE DB = 'test' AND STATE LIKE "Delete_rows_log_event::ha_delete_row(%) on table t1"; +--source include/wait_condition.inc +SELECT db, state FROM INFORMATION_SCHEMA.PROCESSLIST + WHERE DB = 'test' AND STATE LIKE "Delete_rows_log_event::ha_delete_row(%) on table t1"; + +--sync_with_master + +#CleanUp +--connection master +drop table t1; +--connection slave +SET GLOBAL debug_dbug=""; +--source include/rpl_end.inc diff --git a/sql/log_event.cc b/sql/log_event.cc index 21b5de2725e..05112712322 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -11765,18 +11765,27 @@ Write_rows_log_event::do_exec_row(rpl_group_info *rgi) { DBUG_ASSERT(m_table != NULL); const char *tmp= thd->get_proc_info(); - const char *message= "Write_rows_log_event::write_row()"; + char *tmp_db= thd->db; + char *message, msg[128]; + my_snprintf(msg, sizeof(msg),"Write_rows_log_event::write_row() on table %s", + m_table->s->table_name.str); + thd->db= m_table->s->db.str; + message= msg; #ifdef WSREP_PROC_INFO my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Write_rows_log_event::write_row(%lld)", - (long long) wsrep_thd_trx_seqno(thd)); + "Write_rows_log_event::write_row(%lld) on table %s", + (long long) wsrep_thd_trx_seqno(thd), m_table->s->table_name.str); message= thd->wsrep_info; #endif /* WSREP_PROC_INFO */ thd_proc_info(thd, message); + DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{ + my_sleep(500000); + };); int error= write_row(rgi, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT); thd_proc_info(thd, tmp); + thd->db= tmp_db; if (error && !thd->is_error()) { @@ -12372,32 +12381,45 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi) { int error; const char *tmp= thd->get_proc_info(); - const char *message= "Delete_rows_log_event::find_row()"; + char *tmp_db= thd->db; + char *message, msg[128]; + my_snprintf(msg, sizeof(msg),"Delete_rows_log_event::find_row() on table %s", + m_table->s->table_name.str); + thd->db= m_table->s->db.str; + message= msg; const bool invoke_triggers= slave_run_triggers_for_rbr && !master_had_triggers && m_table->triggers; DBUG_ASSERT(m_table != NULL); #ifdef WSREP_PROC_INFO my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Delete_rows_log_event::find_row(%lld)", - (long long) wsrep_thd_trx_seqno(thd)); + "Delete_rows_log_event::find_row(%lld) on table %s", + (long long) wsrep_thd_trx_seqno(thd), m_table->s->table_name.str) ; message= thd->wsrep_info; #endif /* WSREP_PROC_INFO */ thd_proc_info(thd, message); + DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{ + my_sleep(500000); + };); if (!(error= find_row(rgi))) { /* Delete the record found, located in record[0] */ - message= "Delete_rows_log_event::ha_delete_row()"; + my_snprintf(msg, sizeof(msg),"Delete_rows_log_event::ha_delete_row() on table %s", + m_table->s->table_name.str); + message= msg; #ifdef WSREP_PROC_INFO snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Delete_rows_log_event::ha_delete_row(%lld)", - (long long) wsrep_thd_trx_seqno(thd)); + "Delete_rows_log_event::ha_delete_row(%lld) on table %s", + (long long) wsrep_thd_trx_seqno(thd), m_table->s->table_name.str) ; message= thd->wsrep_info; #endif thd_proc_info(thd, message); + DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{ + my_sleep(500000); + };); if (invoke_triggers && process_triggers(TRG_EVENT_DELETE, TRG_ACTION_BEFORE, FALSE)) @@ -12414,6 +12436,7 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi) m_table->file->ha_index_or_rnd_end(); } thd_proc_info(thd, tmp); + thd->db= tmp_db; return error; } @@ -12532,17 +12555,26 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi) const bool invoke_triggers= slave_run_triggers_for_rbr && !master_had_triggers && m_table->triggers; const char *tmp= thd->get_proc_info(); - const char *message= "Update_rows_log_event::find_row()"; + char *tmp_db= thd->db; + char *message, msg[128]; DBUG_ASSERT(m_table != NULL); + my_snprintf(msg, sizeof(msg),"Update_rows_log_event::find_row() on table %s", + m_table->s->table_name.str); + thd->db= m_table->s->db.str; + message= msg; #ifdef WSREP_PROC_INFO my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Update_rows_log_event::find_row(%lld)", - (long long) wsrep_thd_trx_seqno(thd)); + "Update_rows_log_event::find_row(%lld) on table %s", + (long long) wsrep_thd_trx_seqno(thd), m_table->s->table_name.str) ; message= thd->wsrep_info; #endif /* WSREP_PROC_INFO */ thd_proc_info(thd, message); + DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{ + my_sleep(500000); + };); + int error= find_row(rgi); if (error) { @@ -12553,6 +12585,7 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi) if ((m_curr_row= m_curr_row_end)) unpack_current_row(rgi, &m_cols_ai); thd_proc_info(thd, tmp); + thd->db= tmp_db; return error; } @@ -12570,16 +12603,21 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi) store_record(m_table,record[1]); m_curr_row= m_curr_row_end; - message= "Update_rows_log_event::unpack_current_row()"; + my_snprintf(msg, sizeof(msg),"Update_rows_log_event::unpack_current_row() on table %s", + m_table->s->table_name.str); + message= msg; #ifdef WSREP_PROC_INFO my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Update_rows_log_event::unpack_current_row(%lld)", - (long long) wsrep_thd_trx_seqno(thd)); + "Update_rows_log_event::unpack_current_row(%lld) on table %s", + (long long) wsrep_thd_trx_seqno(thd), m_table->s->table_name.str) ; message= thd->wsrep_info; #endif /* WSREP_PROC_INFO */ /* this also updates m_curr_row_end */ thd_proc_info(thd, message); + DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{ + my_sleep(500000); + };); if ((error= unpack_current_row(rgi, &m_cols_ai))) goto err; @@ -12597,15 +12635,20 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi) DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength); #endif - message= "Update_rows_log_event::ha_update_row()"; + my_snprintf(msg, sizeof(msg),"Update_rows_log_event::ha_update_row() on table %s", + m_table->s->table_name.str); + message= msg; #ifdef WSREP_PROC_INFO my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Update_rows_log_event::ha_update_row(%lld)", - (long long) wsrep_thd_trx_seqno(thd)); + "Update_rows_log_event::ha_update_row(%lld) on table %s", + (long long) wsrep_thd_trx_seqno(thd), m_table->s->table_name.str) ; message= thd->wsrep_info; #endif /* WSREP_PROC_INFO */ thd_proc_info(thd, message); + DBUG_EXECUTE_IF("should_sleep_for_mdev7409",{ + my_sleep(500000); + };); if (invoke_triggers && process_triggers(TRG_EVENT_UPDATE, TRG_ACTION_BEFORE, TRUE)) { @@ -12627,9 +12670,9 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi) process_triggers(TRG_EVENT_UPDATE, TRG_ACTION_AFTER, TRUE)) error= HA_ERR_GENERIC; // in case if error is not set yet - thd_proc_info(thd, tmp); - err: + thd_proc_info(thd, tmp); + thd->db= tmp_db; m_table->file->ha_index_or_rnd_end(); return error; }