|
|
|
@ -1,5 +1,5 @@ |
|
|
|
/* |
|
|
|
* Copyright 2024 Vsevolod Stakhov |
|
|
|
* Copyright 2025 Vsevolod Stakhov |
|
|
|
* |
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
@ -130,7 +130,7 @@ struct lua_redis_request_specific_userdata { |
|
|
|
unsigned int nargs; |
|
|
|
char **args; |
|
|
|
gsize *arglens; |
|
|
|
struct lua_redis_userdata *c; |
|
|
|
struct lua_redis_userdata *common_ud; |
|
|
|
struct lua_redis_ctx *ctx; |
|
|
|
struct lua_redis_request_specific_userdata *next; |
|
|
|
ev_timer timeout_ev; |
|
|
|
@ -262,7 +262,7 @@ lua_redis_fin(void *arg) |
|
|
|
struct lua_redis_ctx *ctx; |
|
|
|
|
|
|
|
ctx = sp_ud->ctx; |
|
|
|
ud = sp_ud->c; |
|
|
|
ud = sp_ud->common_ud; |
|
|
|
|
|
|
|
if (ev_can_stop(&sp_ud->timeout_ev)) { |
|
|
|
ev_timer_stop(sp_ud->ctx->async.event_loop, &sp_ud->timeout_ev); |
|
|
|
@ -290,7 +290,7 @@ lua_redis_push_error(const char *err, |
|
|
|
gboolean connected, |
|
|
|
...) |
|
|
|
{ |
|
|
|
struct lua_redis_userdata *ud = sp_ud->c; |
|
|
|
struct lua_redis_userdata *ud = sp_ud->common_ud; |
|
|
|
struct lua_callback_state cbs; |
|
|
|
lua_State *L; |
|
|
|
|
|
|
|
@ -390,7 +390,7 @@ static void |
|
|
|
lua_redis_push_data(const redisReply *r, struct lua_redis_ctx *ctx, |
|
|
|
struct lua_redis_request_specific_userdata *sp_ud) |
|
|
|
{ |
|
|
|
struct lua_redis_userdata *ud = sp_ud->c; |
|
|
|
struct lua_redis_userdata *ud = sp_ud->common_ud; |
|
|
|
struct lua_callback_state cbs; |
|
|
|
lua_State *L; |
|
|
|
|
|
|
|
@ -467,14 +467,14 @@ lua_redis_callback(redisAsyncContext *c, gpointer r, gpointer priv) |
|
|
|
redisAsyncContext *ac; |
|
|
|
|
|
|
|
ctx = sp_ud->ctx; |
|
|
|
ud = sp_ud->c; |
|
|
|
ud = sp_ud->common_ud; |
|
|
|
|
|
|
|
if (ud->terminated || !rspamd_lua_is_initialised()) { |
|
|
|
/* We are already at the termination stage, just go out */ |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
msg_debug_lua_redis("got reply from redis %p for query %p", sp_ud->c->ctx, |
|
|
|
msg_debug_lua_redis("got async reply from redis %p for query %p", sp_ud->common_ud->ctx, |
|
|
|
sp_ud); |
|
|
|
|
|
|
|
REDIS_RETAIN(ctx); |
|
|
|
@ -601,7 +601,7 @@ lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv) |
|
|
|
int results; |
|
|
|
|
|
|
|
ctx = sp_ud->ctx; |
|
|
|
ud = sp_ud->c; |
|
|
|
ud = sp_ud->common_ud; |
|
|
|
lua_State *L = ctx->async.cfg->lua_state; |
|
|
|
|
|
|
|
sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED; |
|
|
|
@ -620,7 +620,7 @@ lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv) |
|
|
|
} |
|
|
|
|
|
|
|
if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) { |
|
|
|
msg_debug_lua_redis("got reply from redis: %p for query %p", ac, sp_ud); |
|
|
|
msg_debug_lua_redis("got sync reply from redis: %p for query %p", ac, sp_ud); |
|
|
|
|
|
|
|
struct lua_redis_result *result = g_malloc0(sizeof *result); |
|
|
|
|
|
|
|
@ -653,17 +653,17 @@ lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv) |
|
|
|
/* if error happened, we should terminate the connection, |
|
|
|
and release it */ |
|
|
|
|
|
|
|
if (result->is_error && sp_ud->c->ctx) { |
|
|
|
ac = sp_ud->c->ctx; |
|
|
|
if (result->is_error && sp_ud->common_ud->ctx) { |
|
|
|
ac = sp_ud->common_ud->ctx; |
|
|
|
/* Set to NULL to avoid double free in dtor */ |
|
|
|
sp_ud->c->ctx = NULL; |
|
|
|
sp_ud->common_ud->ctx = NULL; |
|
|
|
ctx->flags |= LUA_REDIS_TERMINATED; |
|
|
|
|
|
|
|
/* |
|
|
|
* This will call all callbacks pending so the entire context |
|
|
|
* will be destructed |
|
|
|
*/ |
|
|
|
rspamd_redis_pool_release_connection(sp_ud->c->pool, ac, |
|
|
|
rspamd_redis_pool_release_connection(sp_ud->common_ud->pool, ac, |
|
|
|
RSPAMD_REDIS_RELEASE_FATAL); |
|
|
|
} |
|
|
|
|
|
|
|
@ -679,6 +679,8 @@ lua_redis_callback_sync(redisAsyncContext *ac, gpointer r, gpointer priv) |
|
|
|
ctx->cmds_pending--; |
|
|
|
|
|
|
|
if (ctx->cmds_pending == 0) { |
|
|
|
msg_debug_lua_redis("no more commands left for: %p for query %p", ac, sp_ud); |
|
|
|
|
|
|
|
if (ctx->thread) { |
|
|
|
if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) { |
|
|
|
/* somebody yielded and waits for results */ |
|
|
|
@ -717,16 +719,16 @@ lua_redis_timeout_sync(EV_P_ ev_timer *w, int revents) |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
ud = sp_ud->c; |
|
|
|
ud = sp_ud->common_ud; |
|
|
|
ctx = sp_ud->ctx; |
|
|
|
msg_debug_lua_redis("timeout while querying redis server: %p, redis: %p", sp_ud, |
|
|
|
sp_ud->c->ctx); |
|
|
|
sp_ud->common_ud->ctx); |
|
|
|
|
|
|
|
if (sp_ud->c->ctx) { |
|
|
|
ac = sp_ud->c->ctx; |
|
|
|
if (sp_ud->common_ud->ctx) { |
|
|
|
ac = sp_ud->common_ud->ctx; |
|
|
|
|
|
|
|
/* Set to NULL to avoid double free in dtor */ |
|
|
|
sp_ud->c->ctx = NULL; |
|
|
|
sp_ud->common_ud->ctx = NULL; |
|
|
|
ac->err = REDIS_ERR_IO; |
|
|
|
errno = ETIMEDOUT; |
|
|
|
ctx->flags |= LUA_REDIS_TERMINATED; |
|
|
|
@ -735,7 +737,7 @@ lua_redis_timeout_sync(EV_P_ ev_timer *w, int revents) |
|
|
|
* This will call all callbacks pending so the entire context |
|
|
|
* will be destructed |
|
|
|
*/ |
|
|
|
rspamd_redis_pool_release_connection(sp_ud->c->pool, ac, |
|
|
|
rspamd_redis_pool_release_connection(sp_ud->common_ud->pool, ac, |
|
|
|
RSPAMD_REDIS_RELEASE_FATAL); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -754,24 +756,24 @@ lua_redis_timeout(EV_P_ ev_timer *w, int revents) |
|
|
|
} |
|
|
|
|
|
|
|
ctx = sp_ud->ctx; |
|
|
|
ud = sp_ud->c; |
|
|
|
ud = sp_ud->common_ud; |
|
|
|
|
|
|
|
REDIS_RETAIN(ctx); |
|
|
|
msg_debug_lua_redis("timeout while querying redis server: %p, redis: %p", sp_ud, |
|
|
|
sp_ud->c->ctx); |
|
|
|
sp_ud->common_ud->ctx); |
|
|
|
lua_redis_push_error("timeout while connecting the server (%.2f sec)", ctx, sp_ud, TRUE, ud->timeout); |
|
|
|
|
|
|
|
if (sp_ud->c->ctx) { |
|
|
|
ac = sp_ud->c->ctx; |
|
|
|
if (sp_ud->common_ud->ctx) { |
|
|
|
ac = sp_ud->common_ud->ctx; |
|
|
|
/* Set to NULL to avoid double free in dtor */ |
|
|
|
sp_ud->c->ctx = NULL; |
|
|
|
sp_ud->common_ud->ctx = NULL; |
|
|
|
ac->err = REDIS_ERR_IO; |
|
|
|
errno = ETIMEDOUT; |
|
|
|
/* |
|
|
|
* This will call all callbacks pending so the entire context |
|
|
|
* will be destructed |
|
|
|
*/ |
|
|
|
rspamd_redis_pool_release_connection(sp_ud->c->pool, ac, |
|
|
|
rspamd_redis_pool_release_connection(sp_ud->common_ud->pool, ac, |
|
|
|
RSPAMD_REDIS_RELEASE_FATAL); |
|
|
|
} |
|
|
|
|
|
|
|
@ -1095,8 +1097,8 @@ rspamd_lua_redis_prepare_connection(lua_State *L, int *pcbref, gboolean is_async |
|
|
|
return NULL; |
|
|
|
} |
|
|
|
|
|
|
|
msg_debug_lua_redis("opened redis connection host=%s; ctx=%p; ud=%p", |
|
|
|
host, ctx, ud); |
|
|
|
msg_debug_lua_redis("opened redis connection host=%s; lua_ctx=%p; redis_ctx=%p; ud=%p", |
|
|
|
host, ctx, ud->ctx, ud); |
|
|
|
|
|
|
|
return ctx; |
|
|
|
} |
|
|
|
@ -1137,7 +1139,7 @@ lua_redis_make_request(lua_State *L) |
|
|
|
ud = &ctx->async; |
|
|
|
sp_ud = g_malloc0(sizeof(*sp_ud)); |
|
|
|
sp_ud->cbref = cbref; |
|
|
|
sp_ud->c = ud; |
|
|
|
sp_ud->common_ud = ud; |
|
|
|
sp_ud->ctx = ctx; |
|
|
|
|
|
|
|
lua_pushstring(L, "cmd"); |
|
|
|
@ -1501,21 +1503,18 @@ lua_redis_add_cmd(lua_State *L) |
|
|
|
} |
|
|
|
|
|
|
|
sp_ud = g_malloc0(sizeof(*sp_ud)); |
|
|
|
sp_ud->common_ud = &ctx->async; |
|
|
|
ud = &ctx->async; |
|
|
|
if (IS_ASYNC(ctx)) { |
|
|
|
sp_ud->c = &ctx->async; |
|
|
|
ud = &ctx->async; |
|
|
|
sp_ud->cbref = cbref; |
|
|
|
} |
|
|
|
else { |
|
|
|
sp_ud->c = &ctx->async; |
|
|
|
ud = &ctx->async; |
|
|
|
} |
|
|
|
|
|
|
|
sp_ud->ctx = ctx; |
|
|
|
|
|
|
|
lua_redis_parse_args(L, args_pos, cmd, &sp_ud->args, |
|
|
|
&sp_ud->arglens, &sp_ud->nargs); |
|
|
|
|
|
|
|
LL_PREPEND(sp_ud->c->specific, sp_ud); |
|
|
|
LL_PREPEND(sp_ud->common_ud->specific, sp_ud); |
|
|
|
|
|
|
|
if (ud->s && rspamd_session_blocked(ud->s)) { |
|
|
|
lua_pushboolean(L, 0); |
|
|
|
@ -1525,7 +1524,7 @@ lua_redis_add_cmd(lua_State *L) |
|
|
|
} |
|
|
|
|
|
|
|
if (IS_ASYNC(ctx)) { |
|
|
|
ret = redisAsyncCommandArgv(sp_ud->c->ctx, |
|
|
|
ret = redisAsyncCommandArgv(sp_ud->common_ud->ctx, |
|
|
|
lua_redis_callback, |
|
|
|
sp_ud, |
|
|
|
sp_ud->nargs, |
|
|
|
@ -1533,7 +1532,7 @@ lua_redis_add_cmd(lua_State *L) |
|
|
|
sp_ud->arglens); |
|
|
|
} |
|
|
|
else { |
|
|
|
ret = redisAsyncCommandArgv(sp_ud->c->ctx, |
|
|
|
ret = redisAsyncCommandArgv(sp_ud->common_ud->ctx, |
|
|
|
lua_redis_callback_sync, |
|
|
|
sp_ud, |
|
|
|
sp_ud->nargs, |
|
|
|
@ -1554,25 +1553,28 @@ lua_redis_add_cmd(lua_State *L) |
|
|
|
} |
|
|
|
|
|
|
|
sp_ud->timeout_ev.data = sp_ud; |
|
|
|
ev_now_update_if_cheap(ud->event_loop); |
|
|
|
|
|
|
|
if (IS_ASYNC(ctx)) { |
|
|
|
ev_timer_init(&sp_ud->timeout_ev, lua_redis_timeout, |
|
|
|
sp_ud->c->timeout, 0.0); |
|
|
|
sp_ud->common_ud->timeout, 0.0); |
|
|
|
} |
|
|
|
else { |
|
|
|
ev_timer_init(&sp_ud->timeout_ev, lua_redis_timeout_sync, |
|
|
|
sp_ud->c->timeout, 0.0); |
|
|
|
sp_ud->common_ud->timeout, 0.0); |
|
|
|
} |
|
|
|
|
|
|
|
ev_timer_start(ud->event_loop, &sp_ud->timeout_ev); |
|
|
|
msg_debug_lua_redis("added timeout %f for %p", sp_ud->common_ud->timeout, sp_ud); |
|
|
|
|
|
|
|
REDIS_RETAIN(ctx); |
|
|
|
ctx->cmds_pending++; |
|
|
|
} |
|
|
|
else { |
|
|
|
msg_info("call to redis failed: %s", |
|
|
|
sp_ud->c->ctx->errstr); |
|
|
|
sp_ud->common_ud->ctx->errstr); |
|
|
|
lua_pushboolean(L, 0); |
|
|
|
lua_pushstring(L, sp_ud->c->ctx->errstr); |
|
|
|
lua_pushstring(L, sp_ud->common_ud->ctx->errstr); |
|
|
|
|
|
|
|
return 2; |
|
|
|
} |
|
|
|
@ -1606,11 +1608,20 @@ lua_redis_exec(lua_State *L) |
|
|
|
return 0; |
|
|
|
} |
|
|
|
else { |
|
|
|
if (ctx->cmds_pending == 0 && g_queue_get_length(ctx->replies) == 0) { |
|
|
|
struct lua_redis_userdata *ud = &ctx->async; |
|
|
|
int replies_pending = g_queue_get_length(ctx->replies); |
|
|
|
|
|
|
|
msg_debug_lua_redis("execute pending commands for %p; commands pending = %d; replies pending = %d", |
|
|
|
ctx, |
|
|
|
ctx->cmds_pending, |
|
|
|
replies_pending); |
|
|
|
|
|
|
|
if (ctx->cmds_pending == 0 && replies_pending == 0) { |
|
|
|
lua_pushstring(L, "No pending commands to execute"); |
|
|
|
lua_error(L); |
|
|
|
} |
|
|
|
if (ctx->cmds_pending == 0 && g_queue_get_length(ctx->replies) > 0) { |
|
|
|
|
|
|
|
if (ctx->cmds_pending == 0 && replies_pending > 0) { |
|
|
|
int results = lua_redis_push_results(ctx, L); |
|
|
|
return results; |
|
|
|
} |
|
|
|
|