mirror of https://github.com/MariaDB/server
				
				
			
			You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							493 lines
						
					
					
						
							15 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							493 lines
						
					
					
						
							15 KiB
						
					
					
				| /* Copyright (C) 2008 MySQL AB | |
|  | |
|    This program is free software; you can redistribute it and/or modify | |
|    it under the terms of the GNU General Public License as published by | |
|    the Free Software Foundation; version 2 of the License. | |
|  | |
|    This program is distributed in the hope that it will be useful, | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | |
|    GNU General Public License for more details. | |
|  | |
|    You should have received a copy of the GNU General Public License | |
|    along with this program; if not, write to the Free Software | |
|    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */ | |
| 
 | |
| #include "mysql_priv.h" | |
|  | |
| #include "rpl_mi.h" | |
| #include "sql_repl.h" | |
| #include "log_event.h" | |
| #include "rpl_filter.h" | |
| #include <my_dir.h> | |
| #include "rpl_handler.h" | |
|  | |
| Trans_delegate *transaction_delegate; | |
| Binlog_storage_delegate *binlog_storage_delegate; | |
| #ifdef HAVE_REPLICATION | |
| Binlog_transmit_delegate *binlog_transmit_delegate; | |
| Binlog_relay_IO_delegate *binlog_relay_io_delegate; | |
| #endif /* HAVE_REPLICATION */ | |
|  | |
| /* | |
|   structure to save transaction log filename and position | |
| */ | |
| typedef struct Trans_binlog_info { | |
|   my_off_t log_pos; | |
|   char log_file[FN_REFLEN]; | |
| } Trans_binlog_info; | |
| 
 | |
| static pthread_key(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); | |
| 
 | |
| int get_user_var_int(const char *name, | |
|                      long long int *value, int *null_value) | |
| { | |
|   my_bool null_val; | |
|   user_var_entry *entry=  | |
|     (user_var_entry*) my_hash_search(¤t_thd->user_vars, | |
|                                   (uchar*) name, strlen(name)); | |
|   if (!entry) | |
|     return 1; | |
|   *value= entry->val_int(&null_val); | |
|   if (null_value) | |
|     *null_value= null_val; | |
|   return 0; | |
| } | |
| 
 | |
| int get_user_var_real(const char *name, | |
|                       double *value, int *null_value) | |
| { | |
|   my_bool null_val; | |
|   user_var_entry *entry=  | |
|     (user_var_entry*) my_hash_search(¤t_thd->user_vars, | |
|                                   (uchar*) name, strlen(name)); | |
|   if (!entry) | |
|     return 1; | |
|   *value= entry->val_real(&null_val); | |
|   if (null_value) | |
|     *null_value= null_val; | |
|   return 0; | |
| } | |
| 
 | |
| int get_user_var_str(const char *name, char *value, | |
|                      size_t len, unsigned int precision, int *null_value) | |
| { | |
|   String str; | |
|   my_bool null_val; | |
|   user_var_entry *entry=  | |
|     (user_var_entry*) my_hash_search(¤t_thd->user_vars, | |
|                                   (uchar*) name, strlen(name)); | |
|   if (!entry) | |
|     return 1; | |
|   entry->val_str(&null_val, &str, precision); | |
|   strncpy(value, str.c_ptr(), len); | |
|   if (null_value) | |
|     *null_value= null_val; | |
|   return 0; | |
| } | |
| 
 | |
| int delegates_init() | |
| { | |
|   static unsigned long trans_mem[sizeof(Trans_delegate) / sizeof(unsigned long) + 1]; | |
|   static unsigned long storage_mem[sizeof(Binlog_storage_delegate) / sizeof(unsigned long) + 1]; | |
| #ifdef HAVE_REPLICATION | |
|   static unsigned long transmit_mem[sizeof(Binlog_transmit_delegate) / sizeof(unsigned long) + 1]; | |
|   static unsigned long relay_io_mem[sizeof(Binlog_relay_IO_delegate)/ sizeof(unsigned long) + 1]; | |
| #endif | |
|    | |
|   if (!(transaction_delegate= new (trans_mem) Trans_delegate) | |
|       || (!transaction_delegate->is_inited()) | |
|       || !(binlog_storage_delegate= new (storage_mem) Binlog_storage_delegate) | |
|       || (!binlog_storage_delegate->is_inited()) | |
| #ifdef HAVE_REPLICATION | |
|       || !(binlog_transmit_delegate= new (transmit_mem) Binlog_transmit_delegate) | |
|       || (!binlog_transmit_delegate->is_inited()) | |
|       || !(binlog_relay_io_delegate= new (relay_io_mem) Binlog_relay_IO_delegate) | |
|       || (!binlog_relay_io_delegate->is_inited()) | |
| #endif /* HAVE_REPLICATION */ | |
|       ) | |
|     return 1; | |
| 
 | |
|   if (pthread_key_create(&RPL_TRANS_BINLOG_INFO, NULL)) | |
|     return 1; | |
|   return 0; | |
| } | |
| 
 | |
| void delegates_destroy() | |
| { | |
|   if (transaction_delegate) | |
|     transaction_delegate->~Trans_delegate(); | |
|   if (binlog_storage_delegate) | |
|     binlog_storage_delegate->~Binlog_storage_delegate(); | |
| #ifdef HAVE_REPLICATION | |
|   if (binlog_transmit_delegate) | |
|     binlog_transmit_delegate->~Binlog_transmit_delegate(); | |
|   if (binlog_relay_io_delegate) | |
|     binlog_relay_io_delegate->~Binlog_relay_IO_delegate(); | |
| #endif /* HAVE_REPLICATION */ | |
| } | |
| 
 | |
| /* | |
|   This macro is used by almost all the Delegate methods to iterate | |
|   over all the observers running given callback function of the | |
|   delegate . | |
|    | |
|   Add observer plugins to the thd->lex list, after each statement, all | |
|   plugins add to thd->lex will be automatically unlocked. | |
|  */ | |
| #define FOREACH_OBSERVER(r, f, thd, args)                               \ | |
|   param.server_id= thd->server_id;                                      \ | |
|   read_lock();                                                          \ | |
|   Observer_info_iterator iter= observer_info_iter();                    \ | |
|   Observer_info *info= iter++;                                          \ | |
|   for (; info; info= iter++)                                            \ | |
|   {                                                                     \ | |
|     plugin_ref plugin=                                                  \ | |
|       my_plugin_lock(thd, &info->plugin);                               \ | |
|     if (!plugin)                                                        \ | |
|     {                                                                   \ | |
|       r= 1;                                                             \ | |
|       break;                                                            \ | |
|     }                                                                   \ | |
|     if (((Observer *)info->observer)->f                                 \ | |
|         && ((Observer *)info->observer)->f args)                        \ | |
|     {                                                                   \ | |
|       r= 1;                                                             \ | |
|       plugin_unlock(thd, plugin);                                       \ | |
|       break;                                                            \ | |
|     }                                                                   \ | |
|     plugin_unlock(thd, plugin);                                         \ | |
|   }                                                                     \ | |
|   unlock() | |
|  | |
| 
 | |
| int Trans_delegate::after_commit(THD *thd, bool all) | |
| { | |
|   Trans_param param; | |
|   bool is_real_trans= (all || thd->transaction.all.ha_list == 0); | |
|   if (is_real_trans) | |
|     param.flags |= TRANS_IS_REAL_TRANS; | |
| 
 | |
|   Trans_binlog_info *log_info= | |
|     my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); | |
| 
 | |
|   param.log_file= log_info ? log_info->log_file : 0; | |
|   param.log_pos= log_info ? log_info->log_pos : 0; | |
| 
 | |
|   int ret= 0; | |
|   FOREACH_OBSERVER(ret, after_commit, thd, (¶m)); | |
| 
 | |
|   /* | |
|     This is the end of a real transaction or autocommit statement, we | |
|     can free the memory allocated for binlog file and position. | |
|   */ | |
|   if (is_real_trans && log_info) | |
|   { | |
|     my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL); | |
|     my_free(log_info, MYF(0)); | |
|   } | |
|   return ret; | |
| } | |
| 
 | |
| int Trans_delegate::after_rollback(THD *thd, bool all) | |
| { | |
|   Trans_param param; | |
|   bool is_real_trans= (all || thd->transaction.all.ha_list == 0); | |
|   if (is_real_trans) | |
|     param.flags |= TRANS_IS_REAL_TRANS; | |
| 
 | |
|   Trans_binlog_info *log_info= | |
|     my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); | |
|      | |
|   param.log_file= log_info ? log_info->log_file : 0; | |
|   param.log_pos= log_info ? log_info->log_pos : 0; | |
| 
 | |
|   int ret= 0; | |
|   FOREACH_OBSERVER(ret, after_commit, thd, (¶m)); | |
| 
 | |
|   /* | |
|     This is the end of a real transaction or autocommit statement, we | |
|     can free the memory allocated for binlog file and position. | |
|   */ | |
|   if (is_real_trans && log_info) | |
|   { | |
|     my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL); | |
|     my_free(log_info, MYF(0)); | |
|   } | |
|   return ret; | |
| } | |
| 
 | |
| int Binlog_storage_delegate::after_flush(THD *thd, | |
|                                          const char *log_file, | |
|                                          my_off_t log_pos, | |
|                                          bool synced) | |
| { | |
|   Binlog_storage_param param; | |
|   uint32 flags=0; | |
|   if (synced) | |
|     flags |= BINLOG_STORAGE_IS_SYNCED; | |
| 
 | |
|   Trans_binlog_info *log_info= | |
|     my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); | |
|      | |
|   if (!log_info) | |
|   { | |
|     if(!(log_info= | |
|          (Trans_binlog_info *)my_malloc(sizeof(Trans_binlog_info), MYF(0)))) | |
|       return 1; | |
|     my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, log_info); | |
|   } | |
|      | |
|   strcpy(log_info->log_file, log_file+dirname_length(log_file)); | |
|   log_info->log_pos = log_pos; | |
|    | |
|   int ret= 0; | |
|   FOREACH_OBSERVER(ret, after_flush, thd, | |
|                    (¶m, log_info->log_file, log_info->log_pos, flags)); | |
|   return ret; | |
| } | |
| 
 | |
| #ifdef HAVE_REPLICATION | |
| int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags, | |
|                                              const char *log_file, | |
|                                              my_off_t log_pos) | |
| { | |
|   Binlog_transmit_param param; | |
|   param.flags= flags; | |
| 
 | |
|   int ret= 0; | |
|   FOREACH_OBSERVER(ret, transmit_start, thd, (¶m, log_file, log_pos)); | |
|   return ret; | |
| } | |
| 
 | |
| int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags) | |
| { | |
|   Binlog_transmit_param param; | |
|   param.flags= flags; | |
| 
 | |
|   int ret= 0; | |
|   FOREACH_OBSERVER(ret, transmit_stop, thd, (¶m)); | |
|   return ret; | |
| } | |
| 
 | |
| int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags, | |
|                                              String *packet) | |
| { | |
|   /* NOTE2ME: Maximum extra header size for each observer, I hope 32 | |
|      bytes should be enough for each Observer to reserve their extra | |
|      header. If later found this is not enough, we can increase this | |
|      /HEZX | |
|   */ | |
| #define RESERVE_HEADER_SIZE 32 | |
|   unsigned char header[RESERVE_HEADER_SIZE]; | |
|   ulong hlen; | |
|   Binlog_transmit_param param; | |
|   param.flags= flags; | |
|   param.server_id= thd->server_id; | |
| 
 | |
|   int ret= 0; | |
|   read_lock(); | |
|   Observer_info_iterator iter= observer_info_iter(); | |
|   Observer_info *info= iter++; | |
|   for (; info; info= iter++) | |
|   { | |
|     plugin_ref plugin= | |
|       my_plugin_lock(thd, &info->plugin); | |
|     if (!plugin) | |
|     { | |
|       ret= 1; | |
|       break; | |
|     } | |
|     hlen= 0; | |
|     if (((Observer *)info->observer)->reserve_header | |
|         && ((Observer *)info->observer)->reserve_header(¶m, | |
|                                                         header, | |
|                                                         RESERVE_HEADER_SIZE, | |
|                                                         &hlen)) | |
|     { | |
|       ret= 1; | |
|       plugin_unlock(thd, plugin); | |
|       break; | |
|     } | |
|     plugin_unlock(thd, plugin); | |
|     if (hlen == 0) | |
|       continue; | |
|     if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen)) | |
|     { | |
|       ret= 1; | |
|       break; | |
|     } | |
|   } | |
|   unlock(); | |
|   return ret; | |
| } | |
| 
 | |
| int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags, | |
|                                                 String *packet, | |
|                                                 const char *log_file, | |
|                                                 my_off_t log_pos) | |
| { | |
|   Binlog_transmit_param param; | |
|   param.flags= flags; | |
| 
 | |
|   int ret= 0; | |
|   FOREACH_OBSERVER(ret, before_send_event, thd, | |
|                    (¶m, (uchar *)packet->c_ptr(), | |
|                     packet->length(), | |
|                     log_file+dirname_length(log_file), log_pos)); | |
|   return ret; | |
| } | |
| 
 | |
| int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags, | |
|                                                String *packet) | |
| { | |
|   Binlog_transmit_param param; | |
|   param.flags= flags; | |
| 
 | |
|   int ret= 0; | |
|   FOREACH_OBSERVER(ret, after_send_event, thd, | |
|                    (¶m, packet->c_ptr(), packet->length())); | |
|   return ret; | |
| } | |
| 
 | |
| int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags) | |
| 
 | |
| { | |
|   Binlog_transmit_param param; | |
|   param.flags= flags; | |
| 
 | |
|   int ret= 0; | |
|   FOREACH_OBSERVER(ret, after_reset_master, thd, (¶m)); | |
|   return ret; | |
| } | |
| 
 | |
| void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param, | |
|                                           Master_info *mi) | |
| { | |
|   param->mysql= mi->mysql; | |
|   param->user= mi->user; | |
|   param->host= mi->host; | |
|   param->port= mi->port; | |
|   param->master_log_name= mi->master_log_name; | |
|   param->master_log_pos= mi->master_log_pos; | |
| } | |
| 
 | |
| int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi) | |
| { | |
|   Binlog_relay_IO_param param; | |
|   init_param(¶m, mi); | |
| 
 | |
|   int ret= 0; | |
|   FOREACH_OBSERVER(ret, thread_start, thd, (¶m)); | |
|   return ret; | |
| } | |
| 
 | |
| 
 | |
| int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi) | |
| { | |
| 
 | |
|   Binlog_relay_IO_param param; | |
|   init_param(¶m, mi); | |
| 
 | |
|   int ret= 0; | |
|   FOREACH_OBSERVER(ret, thread_stop, thd, (¶m)); | |
|   return ret; | |
| } | |
| 
 | |
| int Binlog_relay_IO_delegate::before_request_transmit(THD *thd, | |
|                                                       Master_info *mi, | |
|                                                       ushort flags) | |
| { | |
|   Binlog_relay_IO_param param; | |
|   init_param(¶m, mi); | |
| 
 | |
|   int ret= 0; | |
|   FOREACH_OBSERVER(ret, before_request_transmit, thd, (¶m, (uint32)flags)); | |
|   return ret; | |
| } | |
| 
 | |
| int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi, | |
|                                                const char *packet, ulong len, | |
|                                                const char **event_buf, | |
|                                                ulong *event_len) | |
| { | |
|   Binlog_relay_IO_param param; | |
|   init_param(¶m, mi); | |
| 
 | |
|   int ret= 0; | |
|   FOREACH_OBSERVER(ret, after_read_event, thd, | |
|                    (¶m, packet, len, event_buf, event_len)); | |
|   return ret; | |
| } | |
| 
 | |
| int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi, | |
|                                                 const char *event_buf, | |
|                                                 ulong event_len, | |
|                                                 bool synced) | |
| { | |
|   Binlog_relay_IO_param param; | |
|   init_param(¶m, mi); | |
| 
 | |
|   uint32 flags=0; | |
|   if (synced) | |
|     flags |= BINLOG_STORAGE_IS_SYNCED; | |
| 
 | |
|   int ret= 0; | |
|   FOREACH_OBSERVER(ret, after_queue_event, thd, | |
|                    (¶m, event_buf, event_len, flags)); | |
|   return ret; | |
| } | |
| 
 | |
| int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi) | |
| 
 | |
| { | |
|   Binlog_relay_IO_param param; | |
|   init_param(¶m, mi); | |
| 
 | |
|   int ret= 0; | |
|   FOREACH_OBSERVER(ret, after_reset_slave, thd, (¶m)); | |
|   return ret; | |
| } | |
| #endif /* HAVE_REPLICATION */ | |
|  | |
| int register_trans_observer(Trans_observer *observer, void *p) | |
| { | |
|   return transaction_delegate->add_observer(observer, (st_plugin_int *)p); | |
| } | |
| 
 | |
| int unregister_trans_observer(Trans_observer *observer, void *p) | |
| { | |
|   return transaction_delegate->remove_observer(observer, (st_plugin_int *)p); | |
| } | |
| 
 | |
| int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p) | |
| { | |
|   return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p); | |
| } | |
| 
 | |
| int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p) | |
| { | |
|   return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p); | |
| } | |
| 
 | |
| #ifdef HAVE_REPLICATION | |
| int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) | |
| { | |
|   return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p); | |
| } | |
| 
 | |
| int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) | |
| { | |
|   return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p); | |
| } | |
| 
 | |
| int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p) | |
| { | |
|   return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p); | |
| } | |
| 
 | |
| int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p) | |
| { | |
|   return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p); | |
| } | |
| #endif /* HAVE_REPLICATION */
 |