From 8bb9f47872e0e581aa9f9a0998699da7c4f00d51 Mon Sep 17 00:00:00 2001 From: Yoni Fogel Date: Wed, 30 Jul 2008 15:26:52 +0000 Subject: [PATCH] Closes #1060 Implemented c_getf_(first,last) git-svn-id: file:///svn/tokudb@5434 c7de825b-a66e-492c-adef-691d508d4ae1 --- src/ydb.c | 102 ++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 80 insertions(+), 22 deletions(-) diff --git a/src/ydb.c b/src/ydb.c index 144268dd96c..1fe9be2cde9 100644 --- a/src/ydb.c +++ b/src/ydb.c @@ -90,6 +90,10 @@ static int toku_db_cursor(DB *db, DB_TXN * txn, DBC **c, u_int32_t flags, int is /* txn methods */ /* lightweight cursor methods. */ +static int toku_c_getf_first(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra); + +static int toku_c_getf_last(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra); + static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra); static int toku_c_getf_prev(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra); @@ -1765,6 +1769,14 @@ static int toku_c_get(DBC * c, DBT * key, DBT * data, u_int32_t flag) { return r; } +static int locked_c_getf_first(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) { + toku_ydb_lock(); int r = toku_c_getf_first(c, flag, f, extra); toku_ydb_unlock(); return r; +} + +static int locked_c_getf_last(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) { + toku_ydb_lock(); int r = toku_c_getf_last(c, flag, f, extra); toku_ydb_unlock(); return r; +} + static int locked_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) { toku_ydb_lock(); int r = toku_c_getf_next(c, flag, f, extra); toku_ydb_unlock(); return r; } @@ -1777,22 +1789,79 @@ static int locked_c_getf_next_dup(DBC *c, u_int32_t flag, void(*f)(DBT const *ke toku_ydb_lock(); int r = toku_c_getf_next_dup(c, flag, f, extra); toku_ydb_unlock(); return r; } -static int toku_c_getf_next_old(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) { - DBT key,val; - memset(&key, 0, sizeof(key)); - memset(&val, 0, sizeof(val)); - +static int toku_c_getf_first(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) { + HANDLE_PANICKED_DB(c->dbp); + u_int32_t lock_flags = get_prelocked_flags(flag); + flag &= ~lock_flags; + assert(flag==0); + TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL; + const DBT *pkey, *pval; + pkey = pval = toku_lt_infinity; + int r; + + DB *db=c->dbp; + toku_lock_tree* lt = db->i->lt; + BOOL do_locking = lt!=NULL && !lock_flags; + + int c_get_result = toku_brt_cursor_get(c->i->c, NULL, NULL, DB_FIRST, txn); + if (c_get_result!=0 && c_get_result!=DB_NOTFOUND) { r = c_get_result; goto cleanup; } + int found = c_get_result==0; + if (found) brt_cursor_peek_current(c->i->c, &pkey, &pval); + if (do_locking) { + DB_TXN *txn_anc = toku_txn_ancestor(c->i->txn); + r = toku_txn_add_lt(txn_anc, lt); + if (r!=0) goto cleanup; + r = toku_lt_acquire_range_read_lock(lt, db, toku_txn_get_txnid(txn_anc->i->tokutxn), + toku_lt_neg_infinity, toku_lt_neg_infinity, + pkey, pval); + if (r!=0) goto cleanup; + } + if (found) { + f(pkey, pval, extra); + } + r = c_get_result; +cleanup: + return r; +} + +static int toku_c_getf_last(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) { + HANDLE_PANICKED_DB(c->dbp); u_int32_t lock_flags = get_prelocked_flags(flag); flag &= ~lock_flags; assert(flag==0); - int r = toku_c_get_noassociate(c, &key, &val, DB_NEXT | flag | lock_flags); - if (r==0) f(&key, &val, extra); + TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL; + const DBT *pkey, *pval; + pkey = pval = toku_lt_neg_infinity; + int r; + + DB *db=c->dbp; + toku_lock_tree* lt = db->i->lt; + BOOL do_locking = lt!=NULL && !lock_flags; + + int c_get_result = toku_brt_cursor_get(c->i->c, NULL, NULL, DB_LAST, txn); + if (c_get_result!=0 && c_get_result!=DB_NOTFOUND) { r = c_get_result; goto cleanup; } + int found = c_get_result==0; + if (found) brt_cursor_peek_current(c->i->c, &pkey, &pval); + if (do_locking) { + DB_TXN *txn_anc = toku_txn_ancestor(c->i->txn); + r = toku_txn_add_lt(txn_anc, lt); + if (r!=0) goto cleanup; + r = toku_lt_acquire_range_read_lock(lt, db, toku_txn_get_txnid(txn_anc->i->tokutxn), + pkey, pval, + toku_lt_infinity, toku_lt_infinity); + if (r!=0) goto cleanup; + } + if (found) { + f(pkey, pval, extra); + } + r = c_get_result; +cleanup: return r; } static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) { HANDLE_PANICKED_DB(c->dbp); - if (toku_c_uninitialized(c)) return toku_c_getf_next_old(c, flag, f, extra); //return toku_c_getf_first(c, flag, f, extra); + if (toku_c_uninitialized(c)) return toku_c_getf_first(c, flag, f, extra); u_int32_t lock_flags = get_prelocked_flags(flag); flag &= ~lock_flags; assert(flag==0); @@ -1835,22 +1904,9 @@ static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT return r; } -static int toku_c_getf_prev_old(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) { - DBT key,val; - memset(&key, 0, sizeof(key)); - memset(&val, 0, sizeof(val)); - - u_int32_t lock_flags = get_prelocked_flags(flag); - flag &= ~lock_flags; - assert(flag==0); - int r = toku_c_get_noassociate(c, &key, &val, DB_PREV | flag | lock_flags); - if (r==0) f(&key, &val, extra); - return r; -} - static int toku_c_getf_prev(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) { HANDLE_PANICKED_DB(c->dbp); - if (toku_c_uninitialized(c)) return toku_c_getf_prev_old(c, flag, f, extra); //return toku_c_getf_last(c, flag, f, extra); + if (toku_c_uninitialized(c)) return toku_c_getf_last(c, flag, f, extra); u_int32_t lock_flags = get_prelocked_flags(flag); flag &= ~lock_flags; assert(flag==0); @@ -2367,6 +2423,8 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int SCRS(c_close); SCRS(c_del); SCRS(c_count); + SCRS(c_getf_first); + SCRS(c_getf_last); SCRS(c_getf_next); SCRS(c_getf_prev); SCRS(c_getf_next_dup);