Browse Source
Merge the changes from 2499d onto the main line. Fixes #2499. close[t:2499].
Merge the changes from 2499d onto the main line. Fixes #2499. close[t:2499].
{{{
svn merge -r 19523:19895 https://svn.tokutek.com/tokudb/toku/tokudb.2499d
}}}
.
git-svn-id: file:///svn/toku/tokudb@19902 c7de825b-a66e-492c-adef-691d508d4ae1
pull/73/head
committed by
Yoni Fogel
136 changed files with 4062 additions and 1024 deletions
-
2linux/file.c
-
2linux/linux.c
-
3linux/os_malloc.c
-
6linux/toku_htonl.h
-
7linux/toku_os_types.h
-
2linux/toku_pthread.c
-
14linux/toku_pthread.h
-
2linux/toku_stdint.h
-
1linux/toku_stdlib.h
-
7linux/toku_time.h
-
17newbrt/Makefile
-
2newbrt/backwards_10.c
-
2newbrt/backwards_10.h
-
2newbrt/block_allocator.c
-
10newbrt/block_allocator.h
-
2newbrt/block_table.c
-
10newbrt/block_table.h
-
2newbrt/brt-XY.c
-
11newbrt/brt-internal.h
-
16newbrt/brt-search.h
-
2newbrt/brt-serialize.c
-
2newbrt/brt-test-helpers.c
-
2newbrt/brt-verify.c
-
21newbrt/brt.c
-
10newbrt/brt.h
-
2newbrt/brt_msg.c
-
16newbrt/brt_msg.h
-
34newbrt/brtdump.c
-
185newbrt/brtloader-internal.h
-
1882newbrt/brtloader.c
-
28newbrt/brtloader.h
-
10newbrt/brttypes.h
-
2newbrt/cachetable.c
-
13newbrt/cachetable.h
-
2newbrt/checkpoint.c
-
14newbrt/checkpoint.h
-
2newbrt/fifo.c
-
8newbrt/fifo.h
-
2newbrt/fifo_msg.c
-
15newbrt/fifo_msg.h
-
2newbrt/fingerprint.c
-
16newbrt/hashfun.h
-
2newbrt/hclip.c
-
2newbrt/includes.h
-
2newbrt/key.c
-
16newbrt/key.h
-
14newbrt/kv-pair.h
-
2newbrt/leafentry.c
-
10newbrt/leafentry.h
-
2newbrt/leaflock.c
-
11newbrt/leaflock.h
-
18newbrt/lock.h
-
11newbrt/log-internal.h
-
17newbrt/log.h
-
2newbrt/logcursor.c
-
10newbrt/logcursor.h
-
2newbrt/logfilemgr.c
-
11newbrt/logfilemgr.h
-
8newbrt/logformat.c
-
2newbrt/logger.c
-
13newbrt/logger.h
-
2newbrt/memarena.c
-
15newbrt/memarena.h
-
2newbrt/memory-debug.c
-
2newbrt/mempool.c
-
11newbrt/mempool.h
-
2newbrt/merger.c
-
12newbrt/merger.h
-
2newbrt/minicron.c
-
14newbrt/minicron.h
-
2newbrt/omt.c
-
16newbrt/omt.h
-
143newbrt/pqueue.c
-
41newbrt/pqueue.h
-
178newbrt/queue.c
-
56newbrt/queue.h
-
14newbrt/rbuf.h
-
2newbrt/recover.c
-
10newbrt/recover.h
-
2newbrt/roll.c
-
11newbrt/roll.h
-
2newbrt/rollback.c
-
13newbrt/rollback.h
-
2newbrt/rwlock.c
-
10newbrt/rwlock.h
-
7newbrt/sub_block.h
-
2newbrt/tdb-recover.c
-
2newbrt/tdb_logprint.c
-
19newbrt/tests/Makefile
-
215newbrt/tests/brtloader-test-write-dbfile.c
-
131newbrt/tests/brtloader-test.c
-
2newbrt/tests/keyrange-unflat.c
-
1newbrt/tests/make.include
-
231newbrt/tests/pqueue-test.c
-
94newbrt/tests/queue-test.c
-
17newbrt/tests/test.h
-
2newbrt/threadpool.c
-
12newbrt/threadpool.h
-
2newbrt/toku_worker.c
-
15newbrt/toku_worker.h
@ -1 +1,2 @@ |
|||
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved." |
|||
#include <stdlib.h> |
|||
@ -1,17 +1,20 @@ |
|||
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved." |
|||
|
|||
#ifndef TOKU_TIME_H |
|||
#define TOKU_TIME_H |
|||
|
|||
#include <time.h> |
|||
#include <sys/time.h> |
|||
|
|||
#ifdef __cplusplus |
|||
#if defined(__cplusplus) || defined(__cilkplusplus) |
|||
extern "C" { |
|||
#endif |
|||
|
|||
static inline float toku_tdiff (struct timeval *a, struct timeval *b) { |
|||
return (a->tv_sec - b->tv_sec) +1e-6*(a->tv_usec - b->tv_usec); |
|||
} |
|||
#ifdef __cplusplus |
|||
|
|||
#if defined(__cplusplus) || defined(__cilkplusplus) |
|||
}; |
|||
#endif |
|||
|
|||
|
|||
1882
newbrt/brtloader.c
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -1,12 +1,26 @@ |
|||
#ifndef TOKU_KEY_H |
|||
#define TOKU_KEY_H |
|||
|
|||
#ident "$Id$" |
|||
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved." |
|||
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved." |
|||
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it." |
|||
|
|||
#include "ybt.h" |
|||
#include "brttypes.h" |
|||
|
|||
#if defined(__cplusplus) || defined(__cilkplusplus) |
|||
extern "C" { |
|||
#endif |
|||
|
|||
int toku_keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len); |
|||
void toku_test_keycompare (void) ; |
|||
|
|||
int toku_builtin_compare_fun (DB *, const DBT *, const DBT*) __attribute__((__visibility__("default"))); |
|||
int toku_dont_call_this_compare_fun (DB *, const DBT *, const DBT*); |
|||
|
|||
#if defined(__cplusplus) || defined(__cilkplusplus) |
|||
}; |
|||
#endif |
|||
|
|||
|
|||
#endif |
|||
@ -0,0 +1,143 @@ |
|||
/* -*- mode: C; c-basic-offset: 4 -*- */ |
|||
#ident "$Id: pqueue.c$" |
|||
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved." |
|||
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it." |
|||
|
|||
#include "brt-internal.h" |
|||
#include "brtloader-internal.h" |
|||
#include "pqueue.h" |
|||
|
|||
#define pqueue_left(i) ((i) << 1) |
|||
#define pqueue_right(i) (((i) << 1) + 1) |
|||
#define pqueue_parent(i) ((i) >> 1) |
|||
|
|||
int pqueue_init(pqueue_t **result, size_t n, int which_db, DB *db, brt_compare_func compare, struct error_callback_s *err_callback) |
|||
{ |
|||
pqueue_t *q; |
|||
if (!(q = toku_malloc(sizeof(pqueue_t)))) |
|||
return errno; |
|||
|
|||
/* Need to allocate n+1 elements since element 0 isn't used. */ |
|||
if (!(q->d = toku_malloc((n + 1) * sizeof(pqueue_node_t *)))) { |
|||
int r = errno; |
|||
toku_free(q); |
|||
return r; |
|||
} |
|||
q->size = 1; |
|||
q->avail = q->step = (n+1); /* see comment above about n+1 */ |
|||
|
|||
q->which_db = which_db; |
|||
q->db = db; |
|||
q->compare = compare; |
|||
q->dup_error = 0; |
|||
|
|||
q->error_callback = err_callback; |
|||
|
|||
*result = q; |
|||
return 0; |
|||
} |
|||
|
|||
void pqueue_free(pqueue_t *q) |
|||
{ |
|||
toku_free(q->d); |
|||
toku_free(q); |
|||
} |
|||
|
|||
|
|||
size_t pqueue_size(pqueue_t *q) |
|||
{ |
|||
/* queue element 0 exists but doesn't count since it isn't used. */ |
|||
return (q->size - 1); |
|||
} |
|||
|
|||
static int pqueue_compare(pqueue_t *q, DBT *next_key, DBT *next_val, DBT *curr_key) |
|||
{ |
|||
int r = q->compare(q->db, next_key, curr_key); |
|||
if ( r == 0 ) { // duplicate key : next_key == curr_key |
|||
q->dup_error = 1; |
|||
if (q->error_callback->set_error_and_callback) |
|||
q->error_callback->set_error_and_callback(q->error_callback->bl, DB_KEYEXIST, q->db, q->which_db, next_key, next_val); |
|||
} |
|||
return ( r > -1 ); |
|||
} |
|||
|
|||
static void pqueue_bubble_up(pqueue_t *q, size_t i) |
|||
{ |
|||
size_t parent_node; |
|||
pqueue_node_t *moving_node = q->d[i]; |
|||
DBT *moving_key = moving_node->key; |
|||
|
|||
for (parent_node = pqueue_parent(i); |
|||
((i > 1) && pqueue_compare(q, q->d[parent_node]->key, q->d[parent_node]->val, moving_key)); |
|||
i = parent_node, parent_node = pqueue_parent(i)) |
|||
{ |
|||
q->d[i] = q->d[parent_node]; |
|||
} |
|||
|
|||
q->d[i] = moving_node; |
|||
} |
|||
|
|||
|
|||
static size_t pqueue_maxchild(pqueue_t *q, size_t i) |
|||
{ |
|||
size_t child_node = pqueue_left(i); |
|||
|
|||
if (child_node >= q->size) |
|||
return 0; |
|||
|
|||
if ((child_node+1) < q->size && |
|||
pqueue_compare(q, q->d[child_node]->key, q->d[child_node]->val, q->d[child_node+1]->key)) |
|||
child_node++; /* use right child instead of left */ |
|||
|
|||
return child_node; |
|||
} |
|||
|
|||
|
|||
static void pqueue_percolate_down(pqueue_t *q, size_t i) |
|||
{ |
|||
size_t child_node; |
|||
pqueue_node_t *moving_node = q->d[i]; |
|||
DBT *moving_key = moving_node->key; |
|||
DBT *moving_val = moving_node->val; |
|||
|
|||
while ((child_node = pqueue_maxchild(q, i)) && |
|||
pqueue_compare(q, moving_key, moving_val, q->d[child_node]->key)) |
|||
{ |
|||
q->d[i] = q->d[child_node]; |
|||
i = child_node; |
|||
} |
|||
|
|||
q->d[i] = moving_node; |
|||
} |
|||
|
|||
|
|||
int pqueue_insert(pqueue_t *q, pqueue_node_t *d) |
|||
{ |
|||
size_t i; |
|||
|
|||
if (!q) return 1; |
|||
if (q->size >= q->avail) return 1; |
|||
|
|||
/* insert item */ |
|||
i = q->size++; |
|||
q->d[i] = d; |
|||
pqueue_bubble_up(q, i); |
|||
|
|||
if ( q->dup_error ) return DB_KEYEXIST; |
|||
return 0; |
|||
} |
|||
|
|||
int pqueue_pop(pqueue_t *q, pqueue_node_t **d) |
|||
{ |
|||
if (!q || q->size == 1) { |
|||
*d = NULL; |
|||
return 0; |
|||
} |
|||
|
|||
*d = q->d[1]; |
|||
q->d[1] = q->d[--q->size]; |
|||
pqueue_percolate_down(q, 1); |
|||
|
|||
if ( q->dup_error ) return DB_KEYEXIST; |
|||
return 0; |
|||
} |
|||
@ -0,0 +1,41 @@ |
|||
#ifndef TOKU_PQUEUE_H |
|||
#define TOKU_PQUEUE_H |
|||
|
|||
#if defined(__cplusplus) || defined(__cilkplusplus) |
|||
extern "C" { |
|||
#endif |
|||
|
|||
typedef struct brt_pqueue_node_t |
|||
{ |
|||
DBT *key; |
|||
DBT *val; |
|||
int i; |
|||
} pqueue_node_t; |
|||
|
|||
typedef struct brt_pqueue_t |
|||
{ |
|||
size_t size; |
|||
size_t avail; |
|||
size_t step; |
|||
|
|||
int which_db; |
|||
DB *db; // needed for compare function |
|||
brt_compare_func compare; |
|||
pqueue_node_t **d; |
|||
int dup_error; |
|||
|
|||
struct error_callback_s *error_callback; |
|||
|
|||
} pqueue_t; |
|||
|
|||
int pqueue_init(pqueue_t **result, size_t n, int which_db, DB *db, brt_compare_func compare, struct error_callback_s *err_callback); |
|||
void pqueue_free(pqueue_t *q); |
|||
size_t pqueue_size(pqueue_t *q); |
|||
int pqueue_insert(pqueue_t *q, pqueue_node_t *d); |
|||
int pqueue_pop(pqueue_t *q, pqueue_node_t **d); |
|||
|
|||
#if defined(__cplusplus) || defined(__cilkplusplus) |
|||
}; |
|||
#endif |
|||
|
|||
#endif //TOKU_PQUEUE_H |
|||
@ -0,0 +1,178 @@ |
|||
#include <errno.h> |
|||
#include <toku_assert.h> |
|||
#include "queue.h" |
|||
#include "memory.h" |
|||
#include "toku_pthread.h" |
|||
|
|||
struct qitem; |
|||
|
|||
struct qitem { |
|||
void *item; |
|||
struct qitem *next; |
|||
u_int64_t weight; |
|||
}; |
|||
|
|||
struct queue { |
|||
u_int64_t contents_weight; // how much stuff is in there? |
|||
u_int64_t weight_limit; // Block enqueueing when the contents gets to be bigger than the weight. |
|||
struct qitem *head, *tail; |
|||
|
|||
BOOL eof; |
|||
|
|||
pthread_mutex_t mutex; |
|||
pthread_cond_t cond; |
|||
}; |
|||
|
|||
// Representation invariant: |
|||
// q->contents_weight is the sum of the weights of everything in the queue. |
|||
// q->weight_limit is the limit on the weight before we block. |
|||
// q->head is the oldest thing in the queue. q->tail is the newest. (If nothing is in the queue then both are NULL) |
|||
// If q->head is not null: |
|||
// q->head->item is the oldest item. |
|||
// q->head->weight is the weight of that item. |
|||
// q->head->next is the next youngest thing. |
|||
// q->eof indicates that the producer has said "that's all". |
|||
// q->mutex and q->cond are used as condition variables. |
|||
|
|||
|
|||
int queue_create (QUEUE *q, u_int64_t weight_limit) |
|||
{ |
|||
QUEUE MALLOC(result); |
|||
if (result==NULL) return errno; |
|||
result->contents_weight = 0; |
|||
result->weight_limit = weight_limit; |
|||
result->head = NULL; |
|||
result->tail = NULL; |
|||
result->eof = FALSE; |
|||
int r; |
|||
r = toku_pthread_mutex_init(&result->mutex, NULL); |
|||
if (r!=0) { |
|||
toku_free(result); |
|||
return r; |
|||
} |
|||
r = toku_pthread_cond_init(&result->cond, NULL); |
|||
if (r!=0) { |
|||
toku_pthread_mutex_destroy(&result->mutex); |
|||
toku_free(result); |
|||
return r; |
|||
} |
|||
*q = result; |
|||
return 0; |
|||
} |
|||
|
|||
int queue_destroy (QUEUE q) |
|||
{ |
|||
if (q->head) return EINVAL; |
|||
assert(q->contents_weight==0); |
|||
{ |
|||
int r = toku_pthread_mutex_destroy(&q->mutex); |
|||
if (r) return r; |
|||
} |
|||
{ |
|||
int r = toku_pthread_cond_destroy(&q->cond); |
|||
if (r) return r; |
|||
} |
|||
toku_free(q); |
|||
return 0; |
|||
} |
|||
|
|||
int queue_enq (QUEUE q, void *item, u_int64_t weight, u_int64_t *total_weight_after_enq) |
|||
{ |
|||
{ |
|||
int r = toku_pthread_mutex_lock(&q->mutex); |
|||
if (r) return r; |
|||
} |
|||
assert(!q->eof); |
|||
// Go ahead and put it in, even if it's too much. |
|||
q->contents_weight += weight; |
|||
struct qitem *MALLOC(qi); |
|||
if (qi==NULL) return errno; |
|||
qi->item = item; |
|||
qi->weight = weight; |
|||
qi->next = NULL; |
|||
if (q->tail) { |
|||
q->tail->next = qi; |
|||
} else { |
|||
assert(q->head==NULL); |
|||
q->head = qi; |
|||
} |
|||
q->tail = qi; |
|||
// Wake up the consumer. |
|||
{ |
|||
int r = toku_pthread_cond_signal(&q->cond); |
|||
if (r) return r; |
|||
} |
|||
// Now block if there's too much stuff in there. |
|||
while (q->weight_limit < q->contents_weight) { |
|||
int r = toku_pthread_cond_wait(&q->cond, &q->mutex); |
|||
if (r) return r; |
|||
} |
|||
// we are allowed to return. |
|||
if (total_weight_after_enq) { |
|||
*total_weight_after_enq = q->contents_weight; |
|||
} |
|||
{ |
|||
int r = toku_pthread_mutex_unlock(&q->mutex); |
|||
if (r) return r; |
|||
} |
|||
return 0; |
|||
} |
|||
|
|||
int queue_eof (QUEUE q) |
|||
{ |
|||
{ |
|||
int r = toku_pthread_mutex_lock(&q->mutex); |
|||
if (r) return r; |
|||
} |
|||
assert(!q->eof); |
|||
q->eof = TRUE; |
|||
{ |
|||
int r = toku_pthread_cond_signal(&q->cond); |
|||
if (r) return r; |
|||
} |
|||
{ |
|||
int r = toku_pthread_mutex_unlock(&q->mutex); |
|||
if (r) return r; |
|||
} |
|||
return 0; |
|||
} |
|||
|
|||
int queue_deq (QUEUE q, void **item, u_int64_t *weight, u_int64_t *total_weight_after_deq) |
|||
{ |
|||
{ |
|||
int r = toku_pthread_mutex_lock(&q->mutex); |
|||
if (r) return r; |
|||
} |
|||
int result; |
|||
while (q->head==NULL && !q->eof) { |
|||
int r = toku_pthread_cond_wait(&q->cond, &q->mutex); |
|||
if (r) return r; |
|||
} |
|||
if (q->head==NULL) { |
|||
assert(q->eof); |
|||
result = EOF; |
|||
} else { |
|||
struct qitem *head = q->head; |
|||
q->contents_weight -= head->weight; |
|||
*item = head->item; |
|||
if (weight) |
|||
*weight = head->weight; |
|||
if (total_weight_after_deq) |
|||
*total_weight_after_deq = q->contents_weight; |
|||
q->head = head->next; |
|||
toku_free(head); |
|||
if (q->head==NULL) { |
|||
q->tail = NULL; |
|||
} |
|||
// wake up the producer, since we decreased the contents_weight. |
|||
int r = toku_pthread_cond_signal(&q->cond); |
|||
if (r!=0) return r; |
|||
// Successful result. |
|||
result = 0; |
|||
} |
|||
{ |
|||
int r = toku_pthread_mutex_unlock(&q->mutex); |
|||
if (r) return r; |
|||
} |
|||
return result; |
|||
} |
|||
@ -0,0 +1,56 @@ |
|||
#ifndef TOKU_QUEUE_H |
|||
#define TOKU_QUEUE_H |
|||
|
|||
#include "brttypes.h" |
|||
|
|||
#if defined(__cplusplus) || defined(__cilkplusplus) |
|||
extern "C" { |
|||
#endif |
|||
|
|||
// The abstraction: |
|||
// |
|||
// queue.h implements a queue suitable for a producer-consumer relationship between two pthreads. |
|||
// The enqueue/dequeue operation is fairly heavyweight (involving pthread condition variables) so it may be useful |
|||
// to enqueue large chunks rather than small chunks. |
|||
// It probably won't work right to have two consumer threads. |
|||
// |
|||
// Every item inserted into the queue has a weight. If the weight |
|||
// gets too big, then the queue blocks on trying to insert more items. |
|||
// The weight can be used to limit the total number of items in the |
|||
// queue (weight of each item=1) or the total memory consumed by queue |
|||
// items (weight of each item is its size). Or the weight's could all be |
|||
// zero for an unlimited queue. |
|||
|
|||
typedef struct queue *QUEUE; |
|||
|
|||
int queue_create (QUEUE *q, u_int64_t weight_limit); |
|||
// Effect: Create a queue with a given weight limit. The queue is initially empty. |
|||
|
|||
int queue_enq (QUEUE q, void *item, u_int64_t weight, u_int64_t *total_weight_after_enq); |
|||
// Effect: Insert ITEM of weight WEIGHT into queue. If the resulting contents weight too much then block (don't return) until the total weight is low enough. |
|||
// If total_weight_after_enq!=NULL then return the current weight of the items in the queue (after finishing blocking on overweight, and after enqueueing the item). |
|||
// If successful return 0. |
|||
// If an error occurs, return the error number, and the state of the queue is undefined. The item may have been enqueued or not, and in fact the queue may be badly corrupted if the condition variables go awry. If it's just a matter of out-of-memory, then the queue is probably OK. |
|||
// Requires: There is only a single consumer. (We wake up the consumer using a pthread_cond_signal (which is suitable only for single consumers.) |
|||
|
|||
int queue_eof (QUEUE q); |
|||
// Effect: Inform the queue that no more values will be inserted. After all the values that have been inserted are dequeued, further dequeue operations will return EOF. |
|||
// Returns 0 on success. On failure, things are pretty bad (likely to be some sort of mutex failure). |
|||
|
|||
int queue_deq (QUEUE q, void **item, u_int64_t *weight, u_int64_t *total_weight_after_deq); |
|||
// Effect: Wait until the queue becomes nonempty. Then dequeue and return the oldest item. The item and its weight are returned in *ITEM. |
|||
// If weight!=NULL then return the item's weight in *weight. |
|||
// If total_weight_after_deq!=NULL then return the current weight of the items in the queue (after dequeuing the item). |
|||
// Return 0 if an item is returned. |
|||
// Return EOF is we no more items will be returned. |
|||
// Usage note: The queue should be destroyed only after any consumers will no longer look at it (for example, they saw EOF). |
|||
|
|||
int queue_destroy (QUEUE q); |
|||
// Effect: Destroy the queue. |
|||
// Requires: The queue must be empty and no consumer should try to dequeue after this (one way to do this is to make sure the consumer saw EOF). |
|||
// Returns 0 on success. If the queue is not empty, returns EINVAL. Other errors are likely to be bad (some sort of mutex or condvar failure). |
|||
|
|||
#if defined(__cplusplus) || defined(__cilkplusplus) |
|||
}; |
|||
#endif |
|||
#endif |
|||
@ -0,0 +1,215 @@ |
|||
/* -*- mode: C; c-basic-offset: 4 -*- */ |
|||
#ident "$Id: pqueue.c$" |
|||
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved." |
|||
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it." |
|||
|
|||
// test the loader write dbfile function |
|||
|
|||
#include "includes.h" |
|||
#include "test.h" |
|||
#include "brtloader-internal.h" |
|||
|
|||
#if defined(__cplusplus) |
|||
extern "C" { |
|||
#endif |
|||
|
|||
static void traceit(const char *s) { |
|||
time_t t = time(NULL); |
|||
printf("%.24s %s\n", ctime(&t), s); |
|||
fflush(stdout); |
|||
} |
|||
|
|||
static int qsort_compare_ints (const void *a, const void *b) { |
|||
int avalue = *(int*)a; |
|||
int bvalue = *(int*)b; |
|||
if (avalue<bvalue) return -1; |
|||
if (avalue>bvalue) return +1; |
|||
return 0; |
|||
|
|||
} |
|||
|
|||
static int compare_ints (DB *dest_db, const DBT *akey, const DBT *bkey) { |
|||
assert(dest_db==NULL); |
|||
assert(akey->size==sizeof(int)); |
|||
assert(bkey->size==sizeof(int)); |
|||
return qsort_compare_ints(akey->data, bkey->data); |
|||
} |
|||
|
|||
static void err_cb(DB *db UU(), int dbn UU(), int err UU(), DBT *key UU(), DBT *val UU(), void *extra UU()) { |
|||
fprintf(stderr, "error in test"); |
|||
abort(); |
|||
} |
|||
|
|||
static void verify_dbfile(int n, const char *name) { |
|||
if (verbose) traceit("verify"); |
|||
|
|||
int r; |
|||
|
|||
CACHETABLE ct; |
|||
r = toku_brt_create_cachetable(&ct, 0, ZERO_LSN, NULL_LOGGER); assert(r==0); |
|||
|
|||
TOKUTXN const null_txn = NULL; |
|||
BRT t = NULL; |
|||
r = toku_brt_create(&t); assert(r == 0); |
|||
r = toku_brt_set_bt_compare(t, compare_ints); assert(r == 0); |
|||
r = toku_brt_open(t, name, 0, 0, ct, null_txn, 0); assert(r==0); |
|||
|
|||
BRT_CURSOR cursor = NULL; |
|||
r = toku_brt_cursor(t, &cursor, NULL, TXNID_NONE, FALSE); assert(r == 0); |
|||
|
|||
int i; |
|||
for (i=0; ; i++) { |
|||
int kk = i; |
|||
int vv = i; |
|||
struct check_pair pair = {sizeof kk, &kk, sizeof vv, &vv, 0}; |
|||
r = toku_brt_cursor_get(cursor, NULL, NULL, lookup_checkf, &pair, DB_NEXT); |
|||
if (r != 0) { |
|||
assert(pair.call_count ==0); |
|||
break; |
|||
} |
|||
assert(pair.call_count==1); |
|||
} |
|||
|
|||
assert(i == n); |
|||
|
|||
r = toku_brt_cursor_close(cursor); assert(r == 0); |
|||
r = toku_close_brt(t, 0); assert(r==0); |
|||
r = toku_cachetable_close(&ct);assert(r==0); |
|||
if (verbose) traceit("verify done"); |
|||
} |
|||
|
|||
static void test_write_dbfile (char *template, int n, char *output_name) { |
|||
if (verbose) traceit("test start"); |
|||
|
|||
DB *dest_db = NULL; |
|||
struct brtloader_s bl = {.panic = 0, |
|||
.temp_file_template = template}; |
|||
int r = brtloader_init_file_infos(&bl.file_infos); |
|||
CKERR(r); |
|||
struct merge_fileset fs; |
|||
init_merge_fileset(&fs); |
|||
|
|||
// put rows in the row set |
|||
struct rowset aset; |
|||
init_rowset(&aset); |
|||
for (int i=0; i<n; i++) { |
|||
DBT key = {.size=sizeof i, |
|||
.data=&i}; |
|||
DBT val = {.size=sizeof i, |
|||
.data=&i}; |
|||
add_row(&aset, &key, &val); |
|||
} |
|||
|
|||
toku_brt_loader_set_n_rows(&bl, n); |
|||
|
|||
brt_loader_init_error_callback(&bl); |
|||
brt_loader_set_error_function(&bl, err_cb, NULL); |
|||
r = brt_loader_sort_and_write_rows(&aset, &fs, &bl, 0, dest_db, compare_ints, 0); CKERR(r); |
|||
destroy_rowset(&aset); |
|||
|
|||
QUEUE q; |
|||
r = queue_create(&q, 0xFFFFFFFF); // infinite queue. |
|||
assert(r==0); |
|||
r = merge_files(&fs, &bl, 0, dest_db, compare_ints, 0, q); CKERR(r); |
|||
assert(fs.n_temp_files==0); |
|||
|
|||
QUEUE q2; |
|||
r = queue_create(&q2, 0xFFFFFFFF); // infinite queue. |
|||
assert(r==0); |
|||
|
|||
size_t num_found = 0; |
|||
while (1) { |
|||
void *v; |
|||
r = queue_deq(q, &v, NULL, NULL); |
|||
if (r==EOF) break; |
|||
struct rowset *rs = (struct rowset *)v; |
|||
printf("v=%p\n", v); |
|||
|
|||
for (size_t i=num_found; i<rs->n_rows; i++) { |
|||
struct row *row = &rs->rows[i]; |
|||
assert(row->klen==sizeof(int)); |
|||
assert(row->vlen==sizeof(int)); |
|||
assert((int)i==*(int*)(rs->data+row->off)); |
|||
} |
|||
|
|||
num_found += rs->n_rows; |
|||
|
|||
r = queue_enq(q2, v, 0, NULL); |
|||
assert(r==0); |
|||
} |
|||
assert((int)num_found == n); |
|||
|
|||
r = queue_eof(q2); |
|||
assert(r==0); |
|||
|
|||
r = queue_destroy(q); |
|||
assert(r==0); |
|||
|
|||
struct descriptor desc = {.version = 1, .dbt = (DBT){.size = 4, .data="abcd"}}; |
|||
|
|||
int fd = open(output_name, O_RDWR | O_CREAT | O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO); |
|||
assert(fd>=0); |
|||
|
|||
if (verbose) traceit("write to file"); |
|||
r = toku_loader_write_brt_from_q_in_C(&bl, &desc, fd, 1000, q2); |
|||
assert(r==0); |
|||
|
|||
r = queue_destroy(q2); |
|||
assert(r==0); |
|||
|
|||
destroy_merge_fileset(&fs); |
|||
brtloader_fi_destroy(&bl.file_infos, FALSE); |
|||
|
|||
// walk a cursor through the dbfile and verify the rows |
|||
verify_dbfile(n, output_name); |
|||
|
|||
brt_loader_destroy_error_callback(&bl); |
|||
} |
|||
|
|||
/* Test to see if we can open temporary files. */ |
|||
int test_main (int argc, const char *argv[]) { |
|||
const char *progname=argv[0]; |
|||
int n = 1; |
|||
argc--; argv++; |
|||
while (argc>0) { |
|||
if (strcmp(argv[0],"-v")==0) { |
|||
verbose=1; |
|||
} else if (strcmp(argv[0],"-q")==0) { |
|||
verbose=0; |
|||
} else if (strcmp(argv[0],"-n") == 0) { |
|||
argc--; argv++; |
|||
n = atoi(argv[0]); |
|||
} else if (argc!=1) { |
|||
fprintf(stderr, "Usage:\n %s [-v] [-q] directory\n", progname); |
|||
exit(1); |
|||
} |
|||
else { |
|||
break; |
|||
} |
|||
argc--; argv++; |
|||
} |
|||
assert(argc==1); // argv[1] is the directory in which to do the test. |
|||
const char* directory = argv[0]; |
|||
char unlink_all[strlen(directory)+20]; |
|||
snprintf(unlink_all, strlen(directory)+20, "rm -rf %s", directory); |
|||
system(unlink_all); |
|||
int r = toku_os_mkdir(directory, 0755); |
|||
assert(r==0); |
|||
|
|||
int templen = strlen(directory)+15; |
|||
char template[templen]; |
|||
int tlen = snprintf(template, templen, "%s/tempXXXXXX", directory); |
|||
assert (tlen>0 && tlen<templen); |
|||
|
|||
char output_name[templen]; |
|||
int olen = snprintf(output_name, templen, "%s/test.tokudb", directory); |
|||
assert (olen>0 && olen<templen); |
|||
|
|||
test_write_dbfile(template, n, output_name); |
|||
|
|||
return 0; |
|||
} |
|||
|
|||
#if defined(__cplusplus) |
|||
}; |
|||
#endif |
|||
@ -0,0 +1,231 @@ |
|||
/* -*- mode: C; c-basic-offset: 4 -*- */ |
|||
#ident "$Id: pqueue.c$" |
|||
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved." |
|||
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it." |
|||
|
|||
#include "test.h" |
|||
#include "includes.h" |
|||
#include "brtloader-internal.h" |
|||
#include "pqueue.h" |
|||
|
|||
int found_dup = -1; |
|||
|
|||
// simple compare func |
|||
static int test_compare(DB *db, const DBT *dbta, const DBT *dbtb) |
|||
{ |
|||
db = db; |
|||
int a = *((int*)dbta->data); |
|||
int b = *((int*)dbtb->data); |
|||
if ( a<b ) return -1; |
|||
if ( a>b ) return 1; |
|||
return 0; |
|||
} |
|||
|
|||
static inline DBT *dbt_init(DBT *dbt, void *data, u_int32_t size) { |
|||
memset(dbt, 0, sizeof *dbt); |
|||
dbt->data = data; |
|||
dbt->size = size; |
|||
return dbt; |
|||
} |
|||
|
|||
static void err_cb(DB *db, int which_db, int err, DBT *key, DBT *val, void *extra) { |
|||
db = db; which_db = which_db; err = err; extra = extra; |
|||
val = val; |
|||
found_dup = *(int *)key->data; |
|||
if (verbose) printf("err_cb : key <%d> val <%d>\n", *(int *)key->data, *(int *)val->data); |
|||
} |
|||
|
|||
static int err_cb_set_error_and_callback(BRTLOADER bl, int error, DB *db, int which_db, DBT *key, DBT *val) { |
|||
bl->error_callback.error_callback(db, which_db, error, key, val, bl->error_callback.extra); |
|||
return 0; |
|||
} |
|||
|
|||
static int run_test(void) |
|||
{ |
|||
const int n_sources=10; |
|||
pqueue_t *pq; |
|||
pqueue_node_t *pq_nodes = (pqueue_node_t *) toku_malloc( n_sources * sizeof(pqueue_node_t)); |
|||
pqueue_node_t *node; |
|||
DB *dest_db = NULL; |
|||
brt_compare_func compare = test_compare; |
|||
int r; |
|||
struct error_callback_s error_callback = { |
|||
.error_callback = err_cb, |
|||
.extra = NULL, |
|||
.db = NULL, |
|||
.which_db = 0, |
|||
.set_error_and_callback = err_cb_set_error_and_callback |
|||
}; |
|||
|
|||
r = pqueue_init(&pq, n_sources, 0, dest_db, compare, &error_callback); |
|||
if (r) return r; |
|||
|
|||
DBT keys[n_sources]; |
|||
DBT vals[n_sources]; |
|||
DBT zero = {.data=0, .flags=DB_DBT_REALLOC, .size=0, .ulen=0}; |
|||
int key_data[10] = {0, 4, 8, 9, 5, 1, 2, 6, 7, 3}; |
|||
|
|||
for (int i=0;i<n_sources; i++) { |
|||
if (verbose) printf("%d ", key_data[i]); |
|||
keys[i] = zero; |
|||
vals[i] = zero; |
|||
dbt_init(&keys[i], &key_data[i], sizeof(int)); |
|||
} |
|||
if (verbose) printf("\n"); |
|||
|
|||
// test 1 : fill it up, then empty it out |
|||
for (int i=0; i<n_sources; i++) { |
|||
pq_nodes[i].key = &keys[i]; |
|||
pq_nodes[i].val = &vals[i]; |
|||
pq_nodes[i].i = i; |
|||
pqueue_insert(pq, &pq_nodes[i]); |
|||
} |
|||
|
|||
for (int i=0; i<n_sources; i++) { |
|||
r = pqueue_pop(pq, &node); assert(r==0); |
|||
if (verbose) printf("%d : %d\n", i, *(int*)(node->key->data)); |
|||
if ( *(int*)(node->key->data) != i ) { if (verbose) printf("FAIL\n"); return -1; } |
|||
} |
|||
pqueue_free(pq); |
|||
if (verbose) printf("test1 : PASS\n"); |
|||
|
|||
// test 2 : fill it, then empty and reload, then empty |
|||
{ |
|||
r = pqueue_init(&pq, n_sources, 0, dest_db, compare, &error_callback); |
|||
if (r) return r; |
|||
} |
|||
|
|||
DBT more_keys[20]; |
|||
DBT more_vals[20]; |
|||
int more_key_data[20] = {0, 4, 8, 9, 5, 1, 2, 6, 7, 3, 10, 11, 14, 13, 12, 17, 19, 15, 18, 16}; |
|||
for (int i=0; i<20; i++) { |
|||
more_keys[i] = zero; |
|||
more_vals[i] = zero; |
|||
dbt_init(&more_keys[i], &more_key_data[i], sizeof(int)); |
|||
} |
|||
|
|||
for (int i=0; i<10; i++) { |
|||
pq_nodes[i].key = &more_keys[i]; |
|||
pq_nodes[i].val = &more_vals[i]; |
|||
pq_nodes[i].i = i; |
|||
pqueue_insert(pq, &pq_nodes[i]); |
|||
} |
|||
|
|||
for (int i=0; i<5; i++) { |
|||
r = pqueue_pop(pq, &node); assert(r==0); |
|||
if ( *(int *)node->key->data != i ) { printf("FAIL\n"); return -1; } |
|||
if (verbose) printf("%d : %d\n", i, *(int*)node->key->data); |
|||
} |
|||
|
|||
int n; |
|||
for (int i=5; i<15; i++) { |
|||
r = pqueue_pop(pq, &node); assert(r==0); |
|||
if ( *(int *)node->key->data != i ) { printf("FAIL\n"); return -1; } |
|||
if (verbose) printf("%d : %d\n", i, *(int*)node->key->data); |
|||
n = node->i; |
|||
pq_nodes[n].key = &more_keys[i+5]; |
|||
pq_nodes[n].val = &more_vals[i+5]; |
|||
pqueue_insert(pq, &pq_nodes[n]); |
|||
} |
|||
|
|||
for (int i=15; i<20; i++) { |
|||
r = pqueue_pop(pq, &node); assert(r==0); |
|||
if ( *(int*)node->key->data != i ) { printf("FAIL\n"); return -1; } |
|||
if (verbose) printf("%d : %d\n", i, *(int*)node->key->data); |
|||
} |
|||
if (verbose) printf("test2 : PASS\n"); |
|||
pqueue_free(pq); |
|||
|
|||
// test 3 : put in a dup |
|||
{ |
|||
r = pqueue_init(&pq, 10, 0, dest_db, compare, &error_callback); |
|||
if (r) return r; |
|||
} |
|||
|
|||
DBT keys3[10]; |
|||
DBT vals3[10]; |
|||
int key_data3[10] = {0, 1, 2, 3, 4, 5, 6, 6, 8, 9}; // dup is 6 |
|||
int val_data3[10]; |
|||
|
|||
for (int i=0; i<10; i++) { |
|||
keys3[i] = zero; |
|||
vals3[i] = zero; |
|||
val_data3[i] = i; |
|||
dbt_init(&keys3[i], &key_data3[i], sizeof(int)); |
|||
dbt_init(&vals3[i], &val_data3[i], sizeof(int)); |
|||
} |
|||
int ii; |
|||
for (ii=0; ii<10; ii++) { |
|||
pq_nodes[ii].key = &keys3[ii]; |
|||
pq_nodes[ii].val = &vals3[ii]; |
|||
pq_nodes[ii].i = ii; |
|||
r = pqueue_insert(pq, &pq_nodes[ii]); |
|||
if ( r != 0 ) goto found_duplicate6; |
|||
} |
|||
for (ii=0; ii<10; ii++) { |
|||
r = pqueue_pop(pq, &node); |
|||
// if (verbose) printf("%d : %d\n", ii, *(int*)node->key->data); |
|||
if ( r != 0 ) goto found_duplicate6; |
|||
} |
|||
found_duplicate6: |
|||
// if (verbose) printf("%d : %d\n", ii, *(int*)node->key->data); |
|||
if ( found_dup != 6 ) { printf("FAIL\n"); return -1; } |
|||
if (verbose) printf("test3 : PASS\n"); |
|||
pqueue_free(pq); |
|||
|
|||
// test 4 - find duplicate when inserting |
|||
r = pqueue_init(&pq, 10, 0, dest_db, compare, &error_callback); if (r) return r; |
|||
|
|||
found_dup = -1; |
|||
DBT keys4[10]; |
|||
DBT vals4[10]; |
|||
int key_data4[10] = {0, 0, 2, 3, 4, 5, 6, 7, 8, 9}; // dup is 0 |
|||
int val_data4[10]; |
|||
|
|||
for (int i=0; i<10; i++) { |
|||
keys4[i] = zero; |
|||
vals4[i] = zero; |
|||
val_data4[i] = i; |
|||
dbt_init(&keys4[i], &key_data4[i], sizeof(int)); |
|||
dbt_init(&vals4[i], &val_data4[i], sizeof(int)); |
|||
} |
|||
|
|||
for (ii=0; ii<10; ii++) { |
|||
pq_nodes[ii].key = &keys4[ii]; |
|||
pq_nodes[ii].val = &vals4[ii]; |
|||
pq_nodes[ii].i = ii; |
|||
r = pqueue_insert(pq, &pq_nodes[ii]); |
|||
if ( r != 0 ) { |
|||
// if (verbose) printf("%d : %d\n", ii, *(int*)pq_nodes[ii].key->data); |
|||
goto found_duplicate0; |
|||
} |
|||
} |
|||
for (ii=0; ii<10; ii++) { |
|||
r = pqueue_pop(pq, &node); |
|||
// if (verbose) printf("%d : %d\n", ii, *(int*)node->key->data); |
|||
if ( r != 0 ) goto found_duplicate0; |
|||
} |
|||
found_duplicate0: |
|||
if ( found_dup != 0 ) { printf("FAIL - found_dup : %d\n", found_dup); return -1; } |
|||
if (verbose) printf("test4 : PASS\n"); |
|||
if (verbose) printf("PASS\n"); |
|||
pqueue_free(pq); |
|||
toku_free(pq_nodes); |
|||
|
|||
return 0; |
|||
} |
|||
|
|||
|
|||
|
|||
int |
|||
test_main (int argc, const char *argv[]) { |
|||
argc--; argv++; |
|||
while (argc>0) { |
|||
if (strcmp(argv[0], "-v")==0) { |
|||
verbose++; |
|||
} |
|||
argc--; |
|||
argv++; |
|||
} |
|||
return run_test(); |
|||
} |
|||
@ -0,0 +1,94 @@ |
|||
#include <string.h> |
|||
#include <stdlib.h> |
|||
#include <unistd.h> |
|||
#include <toku_assert.h> |
|||
#include <toku_pthread.h> |
|||
#include "queue.h" |
|||
|
|||
static int verbose=1; |
|||
|
|||
static int count_0 = 0; |
|||
static u_int64_t e_max_weight=0, d_max_weight = 0; // max weight seen by enqueue thread and dequeue thread respectively. |
|||
|
|||
static void *start_0 (void *arg) { |
|||
QUEUE q = (QUEUE)arg; |
|||
void *item; |
|||
u_int64_t weight; |
|||
long count = 0; |
|||
while (1) { |
|||
u_int64_t this_max_weight; |
|||
int r=queue_deq(q, &item, &weight, &this_max_weight); |
|||
if (r==EOF) break; |
|||
assert(r==0); |
|||
if (this_max_weight>d_max_weight) d_max_weight=this_max_weight; |
|||
long v = (long)item; |
|||
//printf("D(%ld)=%ld %ld\n", v, this_max_weight, d_max_weight); |
|||
assert(v==count); |
|||
count_0++; |
|||
count++; |
|||
} |
|||
return NULL; |
|||
} |
|||
|
|||
static void enq (QUEUE q, long v, u_int64_t weight) { |
|||
u_int64_t this_max_weight; |
|||
int r = queue_enq(q, (void*)v, (weight==0)?0:1, &this_max_weight); |
|||
assert(r==0); |
|||
if (this_max_weight>e_max_weight) e_max_weight=this_max_weight; |
|||
//printf("E(%ld)=%ld %ld\n", v, this_max_weight, e_max_weight); |
|||
} |
|||
|
|||
static void queue_test_0 (u_int64_t weight) |
|||
// Test a queue that can hold WEIGHT items. |
|||
{ |
|||
//printf("\n"); |
|||
count_0 = 0; |
|||
e_max_weight = 0; |
|||
d_max_weight = 0; |
|||
QUEUE q; |
|||
int r; |
|||
r = queue_create(&q, weight); assert(r==0); |
|||
toku_pthread_t thread; |
|||
r = toku_pthread_create(&thread, NULL, start_0, q); assert(r==0); |
|||
enq(q, 0L, weight); |
|||
enq(q, 1L, weight); |
|||
enq(q, 2L, weight); |
|||
enq(q, 3L, weight); |
|||
sleep(1); |
|||
enq(q, 4L, weight); |
|||
enq(q, 5L, weight); |
|||
r = queue_eof(q); assert(r==0); |
|||
void *result; |
|||
r = toku_pthread_join(thread, &result); assert(r==0); |
|||
assert(result==NULL); |
|||
assert(count_0==6); |
|||
r = queue_destroy(q); |
|||
assert(d_max_weight <= weight); |
|||
assert(e_max_weight <= weight); |
|||
} |
|||
|
|||
|
|||
static void parse_args (int argc, const char *argv[]) { |
|||
const char *progname=argv[0]; |
|||
argc--; argv++; |
|||
while (argc>0) { |
|||
if (strcmp(argv[0],"-v")==0) { |
|||
verbose++; |
|||
} else if (strcmp(argv[0],"-q")==0) { |
|||
verbose--; |
|||
} else { |
|||
fprintf(stderr, "Usage:\n %s [-v] [-q]\n", progname); |
|||
exit(1); |
|||
} |
|||
argc--; argv++; |
|||
} |
|||
if (verbose<0) verbose=0; |
|||
} |
|||
|
|||
int main (int argc, const char *argv[]) { |
|||
parse_args(argc, argv); |
|||
queue_test_0(0LL); |
|||
queue_test_0(1LL); |
|||
queue_test_0(2LL); |
|||
return 0; |
|||
} |
|||
Some files were not shown because too many files changed in this diff
Write
Preview
Loading…
Cancel
Save
Reference in new issue