You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

203 lines
5.1 KiB

  1. #include <toku_portability.h>
  2. #include "test.h"
  3. #include <stdio.h>
  4. #include <errno.h>
  5. #include <string.h>
  6. #include "toku_pthread.h"
  7. #include "memory.h"
  8. #include "workqueue.h"
  9. #include "threadpool.h"
  10. int verbose;
  11. static WORKITEM
  12. new_workitem (void) {
  13. WORKITEM wi = (WORKITEM) toku_malloc(sizeof *wi); assert(wi);
  14. return wi;
  15. }
  16. static void
  17. destroy_workitem(WORKITEM wi) {
  18. toku_free(wi);
  19. }
  20. // test simple create and destroy
  21. static void
  22. test_create_destroy (void) {
  23. if (verbose) printf("%s:%d\n", __FUNCTION__, __LINE__);
  24. struct workqueue workqueue, *wq = &workqueue;
  25. workqueue_init(wq);
  26. assert(workqueue_empty(wq));
  27. workqueue_destroy(wq);
  28. }
  29. // verify that the wq implements FIFO ordering
  30. static void
  31. test_simple_enq_deq (int n) {
  32. if (verbose) printf("%s:%d\n", __FUNCTION__, __LINE__);
  33. struct workqueue workqueue, *wq = &workqueue;
  34. int r;
  35. workqueue_init(wq);
  36. assert(workqueue_empty(wq));
  37. WORKITEM work[n];
  38. int i;
  39. for (i=0; i<n; i++) {
  40. work[i] = new_workitem();
  41. workqueue_enq(wq, work[i], 1);
  42. assert(!workqueue_empty(wq));
  43. }
  44. for (i=0; i<n; i++) {
  45. WORKITEM wi = 0;
  46. r = workqueue_deq(wq, &wi, 1);
  47. assert(r == 0 && wi == work[i]);
  48. destroy_workitem(wi);
  49. }
  50. assert(workqueue_empty(wq));
  51. workqueue_destroy(wq);
  52. }
  53. // setting the wq closed should cause deq to return EINVAL
  54. static void
  55. test_set_closed (void) {
  56. if (verbose) printf("%s:%d\n", __FUNCTION__, __LINE__);
  57. struct workqueue workqueue, *wq = &workqueue;
  58. workqueue_init(wq);
  59. WORKITEM wi = 0;
  60. workqueue_set_closed(wq, 1);
  61. int r = workqueue_deq(wq, &wi, 1);
  62. assert(r == EINVAL && wi == 0);
  63. workqueue_destroy(wq);
  64. }
  65. // closing a wq with a blocked reader thread should cause the reader to get EINVAL
  66. static void *
  67. test_set_closed_waiter(void *arg) {
  68. struct workqueue *wq = arg;
  69. int r;
  70. WORKITEM wi = 0;
  71. r = workqueue_deq(wq, &wi, 1);
  72. assert(r == EINVAL && wi == 0);
  73. return arg;
  74. }
  75. static void
  76. test_set_closed_thread (void) {
  77. if (verbose) printf("%s:%d\n", __FUNCTION__, __LINE__);
  78. struct workqueue workqueue, *wq = &workqueue;
  79. int r;
  80. workqueue_init(wq);
  81. toku_pthread_t tid;
  82. r = toku_pthread_create(&tid, 0, test_set_closed_waiter, wq); assert(r == 0);
  83. sleep(1);
  84. workqueue_set_closed(wq, 1);
  85. void *ret;
  86. r = toku_pthread_join(tid, &ret);
  87. assert(r == 0 && ret == wq);
  88. workqueue_destroy(wq);
  89. }
  90. // verify writer reader flow control
  91. // the write (main) thread writes as fast as possible until the wq is full. then it
  92. // waits.
  93. // the read thread reads from the wq slowly using a random delay. it wakes up any
  94. // writers when the wq size <= 1/2 of the wq limit
  95. struct rwfc {
  96. struct workqueue workqueue;
  97. int current, limit;
  98. };
  99. static void rwfc_init (struct rwfc *rwfc, int limit) {
  100. workqueue_init(&rwfc->workqueue);
  101. rwfc->current = 0; rwfc->limit = limit;
  102. }
  103. static void
  104. rwfc_destroy (struct rwfc *rwfc) {
  105. workqueue_destroy(&rwfc->workqueue);
  106. }
  107. static void
  108. rwfc_do_read (WORKITEM wi) {
  109. struct rwfc *rwfc = (struct rwfc *) workitem_arg(wi);
  110. workqueue_lock(&rwfc->workqueue);
  111. if (2*rwfc->current-- > rwfc->limit && 2*rwfc->current <= rwfc->limit) {
  112. workqueue_wakeup_write(&rwfc->workqueue, 0);
  113. }
  114. workqueue_unlock(&rwfc->workqueue);
  115. destroy_workitem(wi);
  116. }
  117. static void *
  118. rwfc_worker (void *arg) {
  119. struct workqueue *wq = arg;
  120. while (1) {
  121. WORKITEM wi = 0;
  122. int r = workqueue_deq(wq, &wi, 1);
  123. if (r == EINVAL) {
  124. assert(wi == 0);
  125. break;
  126. }
  127. usleep(random() % 100);
  128. wi->f(wi);
  129. }
  130. return arg;
  131. }
  132. static void
  133. test_flow_control (int limit, int n, int maxthreads) {
  134. if (verbose) printf("%s:%d\n", __FUNCTION__, __LINE__);
  135. struct rwfc my_rwfc, *rwfc = &my_rwfc;
  136. THREADPOOL tp;
  137. int i;
  138. rwfc_init(rwfc, limit);
  139. threadpool_create(&tp, maxthreads);
  140. for (i=0; i<maxthreads; i++)
  141. threadpool_maybe_add(tp, rwfc_worker, &rwfc->workqueue);
  142. sleep(1); // this is here to block the reader on the first deq
  143. for (i=0; i<n; i++) {
  144. WORKITEM wi = new_workitem();
  145. workitem_init(wi, rwfc_do_read, rwfc);
  146. workqueue_lock(&rwfc->workqueue);
  147. workqueue_enq(&rwfc->workqueue, wi, 0);
  148. rwfc->current++;
  149. while (rwfc->current >= rwfc->limit) {
  150. // printf("%d - %d %d\n", i, rwfc->current, rwfc->limit);
  151. workqueue_wait_write(&rwfc->workqueue, 0);
  152. }
  153. workqueue_unlock(&rwfc->workqueue);
  154. // toku_os_usleep(random() % 1);
  155. }
  156. workqueue_set_closed(&rwfc->workqueue, 1);
  157. threadpool_destroy(&tp);
  158. rwfc_destroy(rwfc);
  159. }
  160. int
  161. test_main (int argc, const char *argv[]) {
  162. int i;
  163. for (i=1; i<argc; i++) {
  164. const char *arg = argv[i];
  165. if (strcmp(arg, "-v") == 0)
  166. verbose++;
  167. }
  168. test_create_destroy();
  169. test_simple_enq_deq(0);
  170. test_simple_enq_deq(42);
  171. test_set_closed();
  172. test_set_closed_thread();
  173. test_flow_control(8, 10000, 1);
  174. test_flow_control(8, 10000, 2);
  175. test_flow_control(8, 10000, 17);
  176. return 0;
  177. }