Browse Source

[Fix] Store reference of upstream list in upstreams objects

pull/3472/head
Vsevolod Stakhov 5 years ago
parent
commit
0d49f9163e
  1. 3
      lualib/lua_redis.lua
  2. 111
      src/lua/lua_upstream.c

3
lualib/lua_redis.lua

@ -64,7 +64,8 @@ local function redis_query_sentinel(ev_base, params, initialised)
end end
-- Coroutines syntax -- Coroutines syntax
local rspamd_redis = require "rspamd_redis" local rspamd_redis = require "rspamd_redis"
local addr = params.sentinels:get_upstream_round_robin()
local sentinels = params.sentinels
local addr = sentinels:get_upstream_round_robin()
local host = addr:get_addr() local host = addr:get_addr()
local masters = {} local masters = {}

111
src/lua/lua_upstream.c

@ -78,24 +78,31 @@ static const struct luaL_reg upstream_list_f[] = {
LUA_FUNCTION_DEF (upstream, ok); LUA_FUNCTION_DEF (upstream, ok);
LUA_FUNCTION_DEF (upstream, fail); LUA_FUNCTION_DEF (upstream, fail);
LUA_FUNCTION_DEF (upstream, get_addr); LUA_FUNCTION_DEF (upstream, get_addr);
LUA_FUNCTION_DEF (upstream, destroy);
static const struct luaL_reg upstream_m[] = { static const struct luaL_reg upstream_m[] = {
LUA_INTERFACE_DEF (upstream, ok), LUA_INTERFACE_DEF (upstream, ok),
LUA_INTERFACE_DEF (upstream, fail), LUA_INTERFACE_DEF (upstream, fail),
LUA_INTERFACE_DEF (upstream, get_addr), LUA_INTERFACE_DEF (upstream, get_addr),
{"__tostring", rspamd_lua_class_tostring}, {"__tostring", rspamd_lua_class_tostring},
{"__gc", lua_upstream_destroy},
{NULL, NULL} {NULL, NULL}
}; };
/* Upstream class */ /* Upstream class */
static struct upstream *
struct rspamd_lua_upstream {
struct upstream *up;
gint upref;
};
static struct rspamd_lua_upstream *
lua_check_upstream (lua_State * L) lua_check_upstream (lua_State * L)
{ {
void *ud = rspamd_lua_check_udata (L, 1, "rspamd{upstream}"); void *ud = rspamd_lua_check_udata (L, 1, "rspamd{upstream}");
luaL_argcheck (L, ud != NULL, 1, "'upstream' expected"); luaL_argcheck (L, ud != NULL, 1, "'upstream' expected");
return ud ? *((struct upstream **)ud) : NULL;
return ud ? (struct rspamd_lua_upstream *)ud : NULL;
} }
/*** /***
@ -107,10 +114,10 @@ static gint
lua_upstream_get_addr (lua_State *L) lua_upstream_get_addr (lua_State *L)
{ {
LUA_TRACE_POINT; LUA_TRACE_POINT;
struct upstream *up = lua_check_upstream (L);
struct rspamd_lua_upstream *up = lua_check_upstream (L);
if (up) { if (up) {
rspamd_lua_ip_push (L, rspamd_upstream_addr_next (up));
rspamd_lua_ip_push (L, rspamd_upstream_addr_next (up->up));
} }
else { else {
lua_pushnil (L); lua_pushnil (L);
@ -127,7 +134,7 @@ static gint
lua_upstream_fail (lua_State *L) lua_upstream_fail (lua_State *L)
{ {
LUA_TRACE_POINT; LUA_TRACE_POINT;
struct upstream *up = lua_check_upstream (L);
struct rspamd_lua_upstream *up = lua_check_upstream (L);
gboolean fail_addr = FALSE; gboolean fail_addr = FALSE;
const gchar *reason = "unknown"; const gchar *reason = "unknown";
@ -144,7 +151,7 @@ lua_upstream_fail (lua_State *L)
reason = lua_tostring (L, 2); reason = lua_tostring (L, 2);
} }
rspamd_upstream_fail (up, fail_addr, reason);
rspamd_upstream_fail (up->up, fail_addr, reason);
} }
return 0; return 0;
@ -158,10 +165,25 @@ static gint
lua_upstream_ok (lua_State *L) lua_upstream_ok (lua_State *L)
{ {
LUA_TRACE_POINT; LUA_TRACE_POINT;
struct upstream *up = lua_check_upstream (L);
struct rspamd_lua_upstream *up = lua_check_upstream (L);
if (up) {
rspamd_upstream_ok (up->up);
}
return 0;
}
static gint
lua_upstream_destroy (lua_State *L)
{
LUA_TRACE_POINT;
struct rspamd_lua_upstream *up = lua_check_upstream (L);
if (up) { if (up) {
rspamd_upstream_ok (up);
/* Remove reference to the parent */
luaL_unref (L, LUA_REGISTRYINDEX, up->upref);
/* Upstream belongs to the upstream list, so no free here */
} }
return 0; return 0;
@ -178,6 +200,25 @@ lua_check_upstream_list (lua_State * L)
return ud ? *((struct upstream_list **)ud) : NULL; return ud ? *((struct upstream_list **)ud) : NULL;
} }
static struct rspamd_lua_upstream *
lua_push_upstream (lua_State * L, gint up_idx, struct upstream *up)
{
struct rspamd_lua_upstream *lua_ups;
if (up_idx < 0) {
up_idx = lua_gettop (L) + up_idx + 1;
}
lua_ups = lua_newuserdata (L, sizeof (*lua_ups));
lua_ups->up = up;
rspamd_lua_setclass (L, "rspamd{upstream}", -1);
/* Store parent in the upstream to prevent gc */
lua_pushvalue (L, up_idx);
lua_ups->upref = luaL_ref (L, LUA_REGISTRYINDEX);
return lua_ups;
}
/*** /***
* @function upstream_list.create(cfg, def, [default_port]) * @function upstream_list.create(cfg, def, [default_port])
* Create new upstream list from its string definition in form `<upstream>,<upstream>;<upstream>` * Create new upstream list from its string definition in form `<upstream>,<upstream>;<upstream>`
@ -276,7 +317,7 @@ lua_upstream_list_get_upstream_by_hash (lua_State *L)
{ {
LUA_TRACE_POINT; LUA_TRACE_POINT;
struct upstream_list *upl; struct upstream_list *upl;
struct upstream *selected, **pselected;
struct upstream *selected;
const gchar *key; const gchar *key;
gsize keyl; gsize keyl;
@ -286,10 +327,9 @@ lua_upstream_list_get_upstream_by_hash (lua_State *L)
if (key) { if (key) {
selected = rspamd_upstream_get (upl, RSPAMD_UPSTREAM_HASHED, key, selected = rspamd_upstream_get (upl, RSPAMD_UPSTREAM_HASHED, key,
(guint)keyl); (guint)keyl);
if (selected) { if (selected) {
pselected = lua_newuserdata (L, sizeof (struct upstream *));
rspamd_lua_setclass (L, "rspamd{upstream}", -1);
*pselected = selected;
lua_push_upstream (L, 1, selected);
} }
else { else {
lua_pushnil (L); lua_pushnil (L);
@ -316,16 +356,14 @@ lua_upstream_list_get_upstream_round_robin (lua_State *L)
{ {
LUA_TRACE_POINT; LUA_TRACE_POINT;
struct upstream_list *upl; struct upstream_list *upl;
struct upstream *selected, **pselected;
struct upstream *selected;
upl = lua_check_upstream_list (L); upl = lua_check_upstream_list (L);
if (upl) { if (upl) {
selected = rspamd_upstream_get (upl, RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); selected = rspamd_upstream_get (upl, RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
if (selected) { if (selected) {
pselected = lua_newuserdata (L, sizeof (struct upstream *));
rspamd_lua_setclass (L, "rspamd{upstream}", -1);
*pselected = selected;
lua_push_upstream (L, 1, selected);
} }
else { else {
lua_pushnil (L); lua_pushnil (L);
@ -348,7 +386,7 @@ lua_upstream_list_get_upstream_master_slave (lua_State *L)
{ {
LUA_TRACE_POINT; LUA_TRACE_POINT;
struct upstream_list *upl; struct upstream_list *upl;
struct upstream *selected, **pselected;
struct upstream *selected;
upl = lua_check_upstream_list (L); upl = lua_check_upstream_list (L);
if (upl) { if (upl) {
@ -357,9 +395,7 @@ lua_upstream_list_get_upstream_master_slave (lua_State *L)
NULL, NULL,
0); 0);
if (selected) { if (selected) {
pselected = lua_newuserdata (L, sizeof (struct upstream *));
rspamd_lua_setclass (L, "rspamd{upstream}", -1);
*pselected = selected;
lua_push_upstream (L, 1, selected);
} }
else { else {
lua_pushnil (L); lua_pushnil (L);
@ -372,16 +408,17 @@ lua_upstream_list_get_upstream_master_slave (lua_State *L)
return 1; return 1;
} }
struct upstream_foreach_cbdata {
lua_State *L;
gint ups_pos;
};
static void lua_upstream_inserter (struct upstream *up, guint idx, void *ud) static void lua_upstream_inserter (struct upstream *up, guint idx, void *ud)
{ {
struct upstream **pup;
lua_State *L = (lua_State *)ud;
pup = lua_newuserdata (L, sizeof (struct upstream *));
rspamd_lua_setclass (L, "rspamd{upstream}", -1);
*pup = up;
struct upstream_foreach_cbdata *cbd = (struct upstream_foreach_cbdata *)ud;
lua_rawseti (L, -2, idx + 1);
lua_push_upstream (cbd->L, cbd->ups_pos, up);
lua_rawseti (cbd->L, -2, idx + 1);
} }
/*** /***
* @method upstream_list:all_upstreams() * @method upstream_list:all_upstreams()
@ -393,11 +430,15 @@ lua_upstream_list_all_upstreams (lua_State *L)
{ {
LUA_TRACE_POINT; LUA_TRACE_POINT;
struct upstream_list *upl; struct upstream_list *upl;
struct upstream_foreach_cbdata cbd;
upl = lua_check_upstream_list (L); upl = lua_check_upstream_list (L);
if (upl) { if (upl) {
cbd.L = L;
cbd.ups_pos = 1;
lua_createtable (L, rspamd_upstreams_count (upl), 0); lua_createtable (L, rspamd_upstreams_count (upl), 0);
rspamd_upstreams_foreach (upl, lua_upstream_inserter, L);
rspamd_upstreams_foreach (upl, lua_upstream_inserter, &cbd);
} }
else { else {
return luaL_error (L, "invalid arguments"); return luaL_error (L, "invalid arguments");
@ -458,6 +499,7 @@ lua_upstream_flag_to_str (enum rspamd_upstreams_watch_event fl)
struct rspamd_lua_upstream_watcher_cbdata { struct rspamd_lua_upstream_watcher_cbdata {
lua_State *L; lua_State *L;
gint cbref; gint cbref;
gint parent_cbref; /* Reference to the upstream list */
struct upstream_list *upl; struct upstream_list *upl;
}; };
@ -470,7 +512,6 @@ lua_upstream_watch_func (struct upstream *up,
struct rspamd_lua_upstream_watcher_cbdata *cdata = struct rspamd_lua_upstream_watcher_cbdata *cdata =
(struct rspamd_lua_upstream_watcher_cbdata *)ud; (struct rspamd_lua_upstream_watcher_cbdata *)ud;
lua_State *L; lua_State *L;
struct upstream **pup;
const gchar *what; const gchar *what;
gint err_idx; gint err_idx;
@ -481,9 +522,14 @@ lua_upstream_watch_func (struct upstream *up,
lua_rawgeti (L, LUA_REGISTRYINDEX, cdata->cbref); lua_rawgeti (L, LUA_REGISTRYINDEX, cdata->cbref);
lua_pushstring (L, what); lua_pushstring (L, what);
pup = lua_newuserdata (L, sizeof (*pup));
*pup = up;
struct rspamd_lua_upstream *lua_ups = lua_newuserdata (L, sizeof (*lua_ups));
lua_ups->up = up;
rspamd_lua_setclass (L, "rspamd{upstream}", -1); rspamd_lua_setclass (L, "rspamd{upstream}", -1);
/* Store parent in the upstream to prevent gc */
lua_rawgeti (L, LUA_REGISTRYINDEX, cdata->parent_cbref);
lua_ups->upref = luaL_ref (L, LUA_REGISTRYINDEX);
lua_pushinteger (L, cur_errors); lua_pushinteger (L, cur_errors);
if (lua_pcall (L, 3, 0, err_idx) != 0) { if (lua_pcall (L, 3, 0, err_idx) != 0) {
@ -503,6 +549,7 @@ lua_upstream_watch_dtor (gpointer ud)
(struct rspamd_lua_upstream_watcher_cbdata *)ud; (struct rspamd_lua_upstream_watcher_cbdata *)ud;
luaL_unref (cdata->L, LUA_REGISTRYINDEX, cdata->cbref); luaL_unref (cdata->L, LUA_REGISTRYINDEX, cdata->cbref);
luaL_unref (cdata->L, LUA_REGISTRYINDEX, cdata->parent_cbref);
g_free (cdata); g_free (cdata);
} }
@ -554,6 +601,8 @@ lua_upstream_list_add_watcher (lua_State *L)
cdata->cbref = luaL_ref (L, LUA_REGISTRYINDEX); cdata->cbref = luaL_ref (L, LUA_REGISTRYINDEX);
cdata->L = L; cdata->L = L;
cdata->upl = upl; cdata->upl = upl;
lua_pushvalue (L, 1); /* upstream list itself */
cdata->parent_cbref = luaL_ref (L, LUA_REGISTRYINDEX);
rspamd_upstreams_add_watch_callback (upl, flags, rspamd_upstreams_add_watch_callback (upl, flags,
lua_upstream_watch_func, lua_upstream_watch_dtor, cdata); lua_upstream_watch_func, lua_upstream_watch_dtor, cdata);

Loading…
Cancel
Save