You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

155 lines
5.3 KiB

  1. /* -*- mode: C; c-basic-offset: 4 -*- */
  2. #ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
  3. #ident "$Id$"
  4. #include <toku_portability.h>
  5. #include <errno.h>
  6. #include <string.h>
  7. #include "toku_assert.h"
  8. #include "brttypes.h"
  9. #include "minicron.h"
  10. static void
  11. toku_gettime (toku_timespec_t *a) {
  12. struct timeval tv;
  13. gettimeofday(&tv, 0);
  14. a->tv_sec = tv.tv_sec;
  15. a->tv_nsec = tv.tv_usec * 1000LL;
  16. }
  17. static int
  18. timespec_compare (toku_timespec_t *a, toku_timespec_t *b) {
  19. if (a->tv_sec > b->tv_sec) return 1;
  20. if (a->tv_sec < b->tv_sec) return -1;
  21. if (a->tv_nsec > b->tv_nsec) return 1;
  22. if (a->tv_nsec < b->tv_nsec) return -1;
  23. return 0;
  24. }
  25. // Implementation notes:
  26. // When calling do_shutdown or change_period, the mutex is obtained, the variables in the minicron struct are modified, and
  27. // the condition variable is signalled. Possibly the minicron thread will miss the signal. To avoid this problem, whenever
  28. // the minicron thread acquires the mutex, it must check to see what the variables say to do (e.g., should it shut down?).
  29. static void*
  30. minicron_do (void *pv)
  31. {
  32. struct minicron *p = pv;
  33. int r = toku_pthread_mutex_lock(&p->mutex);
  34. assert(r==0);
  35. while (1) {
  36. if (p->do_shutdown) {
  37. r = toku_pthread_mutex_unlock(&p->mutex);
  38. assert(r==0);
  39. return 0;
  40. }
  41. if (p->period_in_seconds==0) {
  42. // if we aren't supposed to do it then just do an untimed wait.
  43. r = toku_pthread_cond_wait(&p->condvar, &p->mutex);
  44. assert(r==0);
  45. } else {
  46. // Recompute the wakeup time every time (instead of once per call to f) in case the period changges.
  47. toku_timespec_t wakeup_at = p->time_of_last_call_to_f;
  48. wakeup_at.tv_sec += p->period_in_seconds;
  49. toku_timespec_t now;
  50. toku_gettime(&now);
  51. //printf("wakeup at %.6f (after %d seconds) now=%.6f\n", wakeup_at.tv_sec + wakeup_at.tv_nsec*1e-9, p->period_in_seconds, now.tv_sec + now.tv_nsec*1e-9);
  52. r = toku_pthread_cond_timedwait(&p->condvar, &p->mutex, &wakeup_at);
  53. if (r!=0 && r!=ETIMEDOUT) fprintf(stderr, "%s:%d r=%d (%s)", __FILE__, __LINE__, r, strerror(r));
  54. assert(r==0 || r==ETIMEDOUT);
  55. }
  56. // Now we woke up, and we should figure out what to do
  57. if (p->do_shutdown) {
  58. r = toku_pthread_mutex_unlock(&p->mutex);
  59. assert(r==0);
  60. return 0;
  61. }
  62. if (p->period_in_seconds >0) {
  63. // maybe do a checkpoint
  64. toku_timespec_t now;
  65. toku_gettime(&now);
  66. toku_timespec_t time_to_call = p->time_of_last_call_to_f;
  67. time_to_call.tv_sec += p->period_in_seconds;
  68. int compare = timespec_compare(&time_to_call, &now);
  69. //printf("compare(%.6f, %.6f)=%d\n", time_to_call.tv_sec + time_to_call.tv_nsec*1e-9, now.tv_sec+now.tv_nsec*1e-9, compare);
  70. if (compare <= 0) {
  71. r = toku_pthread_mutex_unlock(&p->mutex);
  72. assert(r==0);
  73. r = p->f(p->arg);
  74. assert(r==0);
  75. r = toku_pthread_mutex_lock(&p->mutex);
  76. assert(r==0);
  77. toku_gettime(&p->time_of_last_call_to_f); // the period is measured between calls to f.
  78. }
  79. }
  80. }
  81. }
  82. int
  83. toku_minicron_setup(struct minicron *p, u_int32_t period_in_seconds, int(*f)(void *), void *arg)
  84. {
  85. p->f = f;
  86. p->arg = arg;
  87. toku_gettime(&p->time_of_last_call_to_f);
  88. //printf("now=%.6f", p->time_of_last_call_to_f.tv_sec + p->time_of_last_call_to_f.tv_nsec*1e-9);
  89. p->period_in_seconds = period_in_seconds;
  90. p->do_shutdown = FALSE;
  91. { int r = toku_pthread_mutex_init(&p->mutex, 0); assert(r==0); }
  92. { int r = toku_pthread_cond_init (&p->condvar, 0); assert(r==0); }
  93. //printf("%s:%d setup period=%d\n", __FILE__, __LINE__, period_in_seconds);
  94. return toku_pthread_create(&p->thread, 0, minicron_do, p);
  95. }
  96. int
  97. toku_minicron_change_period(struct minicron *p, u_int32_t new_period)
  98. {
  99. int r = toku_pthread_mutex_lock(&p->mutex); assert(r==0);
  100. p->period_in_seconds = new_period;
  101. r = toku_pthread_cond_signal(&p->condvar); assert(r==0);
  102. r = toku_pthread_mutex_unlock(&p->mutex); assert(r==0);
  103. return 0;
  104. }
  105. u_int32_t
  106. toku_minicron_get_period(struct minicron *p)
  107. {
  108. int r = toku_pthread_mutex_lock(&p->mutex); assert(r==0);
  109. u_int32_t retval = toku_minicron_get_period_unlocked(p);
  110. r = toku_pthread_mutex_unlock(&p->mutex); assert(r==0);
  111. return retval;
  112. }
  113. /* unlocked function for use by engine status which takes no locks */
  114. u_int32_t
  115. toku_minicron_get_period_unlocked(struct minicron *p)
  116. {
  117. u_int32_t retval = p->period_in_seconds;
  118. return retval;
  119. }
  120. int
  121. toku_minicron_shutdown(struct minicron *p) {
  122. int r = toku_pthread_mutex_lock(&p->mutex); assert(r==0);
  123. assert(!p->do_shutdown);
  124. p->do_shutdown = TRUE;
  125. //printf("%s:%d signalling\n", __FILE__, __LINE__);
  126. r = toku_pthread_cond_signal(&p->condvar); assert(r==0);
  127. r = toku_pthread_mutex_unlock(&p->mutex); assert(r==0);
  128. void *returned_value;
  129. //printf("%s:%d joining\n", __FILE__, __LINE__);
  130. r = toku_pthread_join(p->thread, &returned_value);
  131. if (r!=0) fprintf(stderr, "%s:%d r=%d (%s)\n", __FILE__, __LINE__, r, strerror(r));
  132. assert(r==0); assert(returned_value==0);
  133. r = toku_pthread_cond_destroy(&p->condvar); assert(r==0);
  134. r = toku_pthread_mutex_destroy(&p->mutex); assert(r==0);
  135. //printf("%s:%d shutdowned\n", __FILE__, __LINE__);
  136. return 0;
  137. }
  138. BOOL
  139. toku_minicron_has_been_shutdown(struct minicron *p) {
  140. return p->do_shutdown;
  141. }