You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

189 lines
4.5 KiB

  1. /******************************************************
  2. Copyright (c) 2012-2013 Percona LLC and/or its affiliates.
  3. buffer datasink for XtraBackup.
  4. This program is free software; you can redistribute it and/or modify
  5. it under the terms of the GNU General Public License as published by
  6. the Free Software Foundation; version 2 of the License.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU General Public License for more details.
  11. You should have received a copy of the GNU General Public License
  12. along with this program; if not, write to the Free Software
  13. Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
  14. *******************************************************/
  15. /* Does buffered output to a destination datasink set with ds_set_pipe().
  16. Writes to the destination datasink are guaranteed to not be smaller than a
  17. specified buffer size (DS_DEFAULT_BUFFER_SIZE by default), with the only
  18. exception for the last write for a file. */
  19. #include <mysql_version.h>
  20. #include <my_base.h>
  21. #include "ds_buffer.h"
  22. #include "common.h"
  23. #include "datasink.h"
  24. #define DS_DEFAULT_BUFFER_SIZE (64 * 1024)
  25. typedef struct {
  26. ds_file_t *dst_file;
  27. char *buf;
  28. size_t pos;
  29. size_t size;
  30. } ds_buffer_file_t;
  31. typedef struct {
  32. size_t buffer_size;
  33. } ds_buffer_ctxt_t;
  34. static ds_ctxt_t *buffer_init(const char *root);
  35. static ds_file_t *buffer_open(ds_ctxt_t *ctxt, const char *path,
  36. MY_STAT *mystat);
  37. static int buffer_write(ds_file_t *file, const void *buf, size_t len);
  38. static int buffer_close(ds_file_t *file);
  39. static void buffer_deinit(ds_ctxt_t *ctxt);
  40. datasink_t datasink_buffer = {
  41. &buffer_init,
  42. &buffer_open,
  43. &buffer_write,
  44. &buffer_close,
  45. &buffer_deinit
  46. };
  47. /* Change the default buffer size */
  48. void ds_buffer_set_size(ds_ctxt_t *ctxt, size_t size)
  49. {
  50. ds_buffer_ctxt_t *buffer_ctxt = (ds_buffer_ctxt_t *) ctxt->ptr;
  51. buffer_ctxt->buffer_size = size;
  52. }
  53. static ds_ctxt_t *
  54. buffer_init(const char *root)
  55. {
  56. ds_ctxt_t *ctxt;
  57. ds_buffer_ctxt_t *buffer_ctxt;
  58. ctxt = my_malloc(sizeof(ds_ctxt_t) + sizeof(ds_buffer_ctxt_t),
  59. MYF(MY_FAE));
  60. buffer_ctxt = (ds_buffer_ctxt_t *) (ctxt + 1);
  61. buffer_ctxt->buffer_size = DS_DEFAULT_BUFFER_SIZE;
  62. ctxt->ptr = buffer_ctxt;
  63. ctxt->root = my_strdup(root, MYF(MY_FAE));
  64. return ctxt;
  65. }
  66. static ds_file_t *
  67. buffer_open(ds_ctxt_t *ctxt, const char *path, MY_STAT *mystat)
  68. {
  69. ds_buffer_ctxt_t *buffer_ctxt;
  70. ds_ctxt_t *pipe_ctxt;
  71. ds_file_t *dst_file;
  72. ds_file_t *file;
  73. ds_buffer_file_t *buffer_file;
  74. pipe_ctxt = ctxt->pipe_ctxt;
  75. xb_a(pipe_ctxt != NULL);
  76. dst_file = ds_open(pipe_ctxt, path, mystat);
  77. if (dst_file == NULL) {
  78. exit(EXIT_FAILURE);
  79. }
  80. buffer_ctxt = (ds_buffer_ctxt_t *) ctxt->ptr;
  81. file = (ds_file_t *) my_malloc(sizeof(ds_file_t) +
  82. sizeof(ds_buffer_file_t) +
  83. buffer_ctxt->buffer_size,
  84. MYF(MY_FAE));
  85. buffer_file = (ds_buffer_file_t *) (file + 1);
  86. buffer_file->dst_file = dst_file;
  87. buffer_file->buf = (char *) (buffer_file + 1);
  88. buffer_file->size = buffer_ctxt->buffer_size;
  89. buffer_file->pos = 0;
  90. file->path = dst_file->path;
  91. file->ptr = buffer_file;
  92. return file;
  93. }
  94. static int
  95. buffer_write(ds_file_t *file, const void *buf, size_t len)
  96. {
  97. ds_buffer_file_t *buffer_file;
  98. buffer_file = (ds_buffer_file_t *) file->ptr;
  99. while (len > 0) {
  100. if (buffer_file->pos + len > buffer_file->size) {
  101. if (buffer_file->pos > 0) {
  102. size_t bytes;
  103. bytes = buffer_file->size - buffer_file->pos;
  104. memcpy(buffer_file->buf + buffer_file->pos, buf,
  105. bytes);
  106. if (ds_write(buffer_file->dst_file,
  107. buffer_file->buf,
  108. buffer_file->size)) {
  109. return 1;
  110. }
  111. buffer_file->pos = 0;
  112. buf = (const char *) buf + bytes;
  113. len -= bytes;
  114. } else {
  115. /* We don't have any buffered bytes, just write
  116. the entire source buffer */
  117. if (ds_write(buffer_file->dst_file, buf, len)) {
  118. return 1;
  119. }
  120. break;
  121. }
  122. } else {
  123. memcpy(buffer_file->buf + buffer_file->pos, buf, len);
  124. buffer_file->pos += len;
  125. break;
  126. }
  127. }
  128. return 0;
  129. }
  130. static int
  131. buffer_close(ds_file_t *file)
  132. {
  133. ds_buffer_file_t *buffer_file;
  134. int ret;
  135. buffer_file = (ds_buffer_file_t *) file->ptr;
  136. if (buffer_file->pos > 0) {
  137. ds_write(buffer_file->dst_file, buffer_file->buf,
  138. buffer_file->pos);
  139. }
  140. ret = ds_close(buffer_file->dst_file);
  141. my_free(file);
  142. return ret;
  143. }
  144. static void
  145. buffer_deinit(ds_ctxt_t *ctxt)
  146. {
  147. my_free(ctxt->root);
  148. my_free(ctxt);
  149. }