You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

213 lines
5.3 KiB

  1. /* Copyright (C) 2008 MySQL AB
  2. This program is free software; you can redistribute it and/or modify
  3. it under the terms of the GNU General Public License as published by
  4. the Free Software Foundation; version 2 of the License.
  5. This program is distributed in the hope that it will be useful,
  6. but WITHOUT ANY WARRANTY; without even the implied warranty of
  7. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  8. GNU General Public License for more details.
  9. You should have received a copy of the GNU General Public License
  10. along with this program; if not, write to the Free Software
  11. Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
  12. #ifndef RPL_HANDLER_H
  13. #define RPL_HANDLER_H
  14. #include "mysql_priv.h"
  15. #include "rpl_mi.h"
  16. #include "rpl_rli.h"
  17. #include "sql_plugin.h"
  18. #include "replication.h"
  19. class Observer_info {
  20. public:
  21. void *observer;
  22. st_plugin_int *plugin_int;
  23. plugin_ref plugin;
  24. Observer_info(void *ob, st_plugin_int *p)
  25. :observer(ob), plugin_int(p)
  26. {
  27. plugin= plugin_int_to_ref(plugin_int);
  28. }
  29. };
  30. class Delegate {
  31. public:
  32. typedef List<Observer_info> Observer_info_list;
  33. typedef List_iterator<Observer_info> Observer_info_iterator;
  34. int add_observer(void *observer, st_plugin_int *plugin)
  35. {
  36. int ret= FALSE;
  37. if (!inited)
  38. return TRUE;
  39. write_lock();
  40. Observer_info_iterator iter(observer_info_list);
  41. Observer_info *info= iter++;
  42. while (info && info->observer != observer)
  43. info= iter++;
  44. if (!info)
  45. {
  46. info= new Observer_info(observer, plugin);
  47. if (!info || observer_info_list.push_back(info, &memroot))
  48. ret= TRUE;
  49. }
  50. else
  51. ret= TRUE;
  52. unlock();
  53. return ret;
  54. }
  55. int remove_observer(void *observer, st_plugin_int *plugin)
  56. {
  57. int ret= FALSE;
  58. if (!inited)
  59. return TRUE;
  60. write_lock();
  61. Observer_info_iterator iter(observer_info_list);
  62. Observer_info *info= iter++;
  63. while (info && info->observer != observer)
  64. info= iter++;
  65. if (info)
  66. iter.remove();
  67. else
  68. ret= TRUE;
  69. unlock();
  70. return ret;
  71. }
  72. inline Observer_info_iterator observer_info_iter()
  73. {
  74. return Observer_info_iterator(observer_info_list);
  75. }
  76. inline bool is_empty()
  77. {
  78. return observer_info_list.is_empty();
  79. }
  80. inline int read_lock()
  81. {
  82. if (!inited)
  83. return TRUE;
  84. return rw_rdlock(&lock);
  85. }
  86. inline int write_lock()
  87. {
  88. if (!inited)
  89. return TRUE;
  90. return rw_wrlock(&lock);
  91. }
  92. inline int unlock()
  93. {
  94. if (!inited)
  95. return TRUE;
  96. return rw_unlock(&lock);
  97. }
  98. inline bool is_inited()
  99. {
  100. return inited;
  101. }
  102. Delegate()
  103. {
  104. inited= FALSE;
  105. if (my_rwlock_init(&lock, NULL))
  106. return;
  107. init_sql_alloc(&memroot, 1024, 0);
  108. inited= TRUE;
  109. }
  110. ~Delegate()
  111. {
  112. inited= FALSE;
  113. rwlock_destroy(&lock);
  114. free_root(&memroot, MYF(0));
  115. }
  116. private:
  117. Observer_info_list observer_info_list;
  118. rw_lock_t lock;
  119. MEM_ROOT memroot;
  120. bool inited;
  121. };
  122. class Trans_delegate
  123. :public Delegate {
  124. public:
  125. typedef Trans_observer Observer;
  126. int before_commit(THD *thd, bool all);
  127. int before_rollback(THD *thd, bool all);
  128. int after_commit(THD *thd, bool all);
  129. int after_rollback(THD *thd, bool all);
  130. };
  131. class Binlog_storage_delegate
  132. :public Delegate {
  133. public:
  134. typedef Binlog_storage_observer Observer;
  135. int after_flush(THD *thd, const char *log_file,
  136. my_off_t log_pos, bool synced);
  137. };
  138. #ifdef HAVE_REPLICATION
  139. class Binlog_transmit_delegate
  140. :public Delegate {
  141. public:
  142. typedef Binlog_transmit_observer Observer;
  143. int transmit_start(THD *thd, ushort flags,
  144. const char *log_file, my_off_t log_pos);
  145. int transmit_stop(THD *thd, ushort flags);
  146. int reserve_header(THD *thd, ushort flags, String *packet);
  147. int before_send_event(THD *thd, ushort flags,
  148. String *packet, const
  149. char *log_file, my_off_t log_pos );
  150. int after_send_event(THD *thd, ushort flags,
  151. String *packet);
  152. int after_reset_master(THD *thd, ushort flags);
  153. };
  154. class Binlog_relay_IO_delegate
  155. :public Delegate {
  156. public:
  157. typedef Binlog_relay_IO_observer Observer;
  158. int thread_start(THD *thd, Master_info *mi);
  159. int thread_stop(THD *thd, Master_info *mi);
  160. int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
  161. int after_read_event(THD *thd, Master_info *mi,
  162. const char *packet, ulong len,
  163. const char **event_buf, ulong *event_len);
  164. int after_queue_event(THD *thd, Master_info *mi,
  165. const char *event_buf, ulong event_len,
  166. bool synced);
  167. int after_reset_slave(THD *thd, Master_info *mi);
  168. private:
  169. void init_param(Binlog_relay_IO_param *param, Master_info *mi);
  170. };
  171. #endif /* HAVE_REPLICATION */
  172. int delegates_init();
  173. void delegates_destroy();
  174. extern Trans_delegate *transaction_delegate;
  175. extern Binlog_storage_delegate *binlog_storage_delegate;
  176. #ifdef HAVE_REPLICATION
  177. extern Binlog_transmit_delegate *binlog_transmit_delegate;
  178. extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
  179. #endif /* HAVE_REPLICATION */
  180. /*
  181. if there is no observers in the delegate, we can return 0
  182. immediately.
  183. */
  184. #define RUN_HOOK(group, hook, args) \
  185. (group ##_delegate->is_empty() ? \
  186. 0 : group ##_delegate->hook args)
  187. #endif /* RPL_HANDLER_H */