Browse Source

[Project] Add support of granular timeouts to plugins and maps

pull/5614/head
Vsevolod Stakhov 4 weeks ago
parent
commit
2a37e68144
No known key found for this signature in database GPG Key ID: 7647B6790081437
  1. 65
      lualib/lua_clickhouse.lua
  2. 10
      lualib/plugins/neural/providers/llm.lua
  3. 64
      src/libserver/maps/map.c
  4. 9
      src/libserver/maps/map_private.h
  5. 30
      src/plugins/lua/bimi.lua
  6. 90
      src/plugins/lua/contextal.lua
  7. 15
      src/plugins/lua/gpt.lua
  8. 25
      src/plugins/lua/metadata_exporter.lua

65
lualib/lua_clickhouse.lua

@ -13,7 +13,7 @@ distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
]]--
]] --
--[[[
-- @module lua_clickhouse
@ -75,7 +75,6 @@ end
-- Converts a row into TSV, taking extra care about arrays
local function row_to_tsv(row)
for i, elt in ipairs(row) do
local t = type(elt)
if t == 'table' then
@ -185,8 +184,8 @@ local function mk_http_select_cb(upstream, params, ok_cb, fail_cb, row_cb)
fail_cb(params, err_message, data)
else
rspamd_logger.errx(params.log_obj,
"request failed on clickhouse server %s: %s",
ip_addr, err_message)
"request failed on clickhouse server %s: %s",
ip_addr, err_message)
end
upstream:fail()
else
@ -198,8 +197,8 @@ local function mk_http_select_cb(upstream, params, ok_cb, fail_cb, row_cb)
ok_cb(params, rows)
else
lua_util.debugm(N, params.log_obj,
"http_select_cb ok: %s, %s, %s, %s", err_message, code,
data:gsub('[\n%s]+', ' '), _)
"http_select_cb ok: %s, %s, %s, %s", err_message, code,
data:gsub('[\n%s]+', ' '), _)
end
else
if fail_cb then
@ -207,8 +206,8 @@ local function mk_http_select_cb(upstream, params, ok_cb, fail_cb, row_cb)
else
local ip_addr = upstream:get_addr():to_string(true)
rspamd_logger.errx(params.log_obj,
"request failed on clickhouse server %s: %s",
ip_addr, 'failed to parse reply')
"request failed on clickhouse server %s: %s",
ip_addr, 'failed to parse reply')
end
end
end
@ -230,8 +229,8 @@ local function mk_http_insert_cb(upstream, params, ok_cb, fail_cb)
fail_cb(params, err_message, data)
else
rspamd_logger.errx(params.log_obj,
"request failed on clickhouse server %s: %s",
ip_addr, err_message)
"request failed on clickhouse server %s: %s",
ip_addr, err_message)
end
upstream:fail()
else
@ -245,11 +244,10 @@ local function mk_http_insert_cb(upstream, params, ok_cb, fail_cb)
else
ok_cb(params, parsed)
end
else
lua_util.debugm(N, params.log_obj,
"http_insert_cb ok: %s, %s, %s, %s", err_message, code,
data:gsub('[\n%s]+', ' '), _)
"http_insert_cb ok: %s, %s, %s, %s", err_message, code,
data:gsub('[\n%s]+', ' '), _)
end
end
end
@ -294,6 +292,11 @@ exports.select = function(upstream, settings, params, query, ok_cb, fail_cb, row
http_params.body = query
http_params.log_obj = params.task or params.config
http_params.opaque_body = true
-- staged timeouts
http_params.connect_timeout = settings.connect_timeout
http_params.ssl_timeout = settings.ssl_timeout
http_params.write_timeout = settings.write_timeout
http_params.read_timeout = settings.read_timeout
lua_util.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
@ -305,7 +308,7 @@ exports.select = function(upstream, settings, params, query, ok_cb, fail_cb, row
local ip_addr = upstream:get_addr():to_string(true)
local database = settings.database or 'default'
http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
connect_prefix, ip_addr, escape_spaces(database))
connect_prefix, ip_addr, escape_spaces(database))
end
return rspamd_http.request(http_params)
@ -349,6 +352,11 @@ exports.select_sync = function(upstream, settings, params, query, row_cb)
http_params.body = query
http_params.log_obj = params.task or params.config
http_params.opaque_body = true
-- staged timeouts
http_params.connect_timeout = settings.connect_timeout
http_params.ssl_timeout = settings.ssl_timeout
http_params.write_timeout = settings.write_timeout
http_params.read_timeout = settings.read_timeout
lua_util.debugm(N, http_params.log_obj, "clickhouse select request: %s", http_params.body)
@ -360,7 +368,7 @@ exports.select_sync = function(upstream, settings, params, query, row_cb)
local ip_addr = upstream:get_addr():to_string(true)
local database = settings.database or 'default'
http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
connect_prefix, ip_addr, escape_spaces(database))
connect_prefix, ip_addr, escape_spaces(database))
end
local err, response = rspamd_http.request(http_params)
@ -414,6 +422,11 @@ exports.insert = function(upstream, settings, params, query, rows,
http_params.method = 'POST'
http_params.body = { rspamd_text.fromtable(rows, '\n'), '\n' }
http_params.log_obj = params.task or params.config
-- staged timeouts
http_params.connect_timeout = settings.connect_timeout
http_params.ssl_timeout = settings.ssl_timeout
http_params.write_timeout = settings.write_timeout
http_params.read_timeout = settings.read_timeout
if not http_params.url then
local connect_prefix = "http://"
@ -423,10 +436,10 @@ exports.insert = function(upstream, settings, params, query, rows,
local ip_addr = upstream:get_addr():to_string(true)
local database = settings.database or 'default'
http_params.url = string.format('%s%s/?database=%s&query=%s%%20FORMAT%%20TabSeparated',
connect_prefix,
ip_addr,
escape_spaces(database),
escape_spaces(query))
connect_prefix,
ip_addr,
escape_spaces(database),
escape_spaces(query))
end
return rspamd_http.request(http_params)
@ -468,6 +481,11 @@ exports.generic = function(upstream, settings, params, query,
http_params.password = settings.password
http_params.log_obj = params.task or params.config
http_params.body = query
-- staged timeouts
http_params.connect_timeout = settings.connect_timeout
http_params.ssl_timeout = settings.ssl_timeout
http_params.write_timeout = settings.write_timeout
http_params.read_timeout = settings.read_timeout
if not http_params.url then
local connect_prefix = "http://"
@ -477,7 +495,7 @@ exports.generic = function(upstream, settings, params, query,
local ip_addr = upstream:get_addr():to_string(true)
local database = settings.database or 'default'
http_params.url = string.format('%s%s/?database=%s&default_format=JSONEachRow',
connect_prefix, ip_addr, escape_spaces(database))
connect_prefix, ip_addr, escape_spaces(database))
end
return rspamd_http.request(http_params)
@ -515,6 +533,11 @@ exports.generic_sync = function(upstream, settings, params, query)
http_params.password = settings.password
http_params.log_obj = params.task or params.config
http_params.body = query
-- staged timeouts
http_params.connect_timeout = settings.connect_timeout
http_params.ssl_timeout = settings.ssl_timeout
http_params.write_timeout = settings.write_timeout
http_params.read_timeout = settings.read_timeout
if not http_params.url then
local connect_prefix = "http://"
@ -524,7 +547,7 @@ exports.generic_sync = function(upstream, settings, params, query)
local ip_addr = upstream:get_addr():to_string(true)
local database = settings.database or 'default'
http_params.url = string.format('%s%s/?database=%s&default_format=JSON',
connect_prefix, ip_addr, escape_spaces(database))
connect_prefix, ip_addr, escape_spaces(database))
end
local err, response = rspamd_http.request(http_params)

10
lualib/plugins/neural/providers/llm.lua

@ -45,6 +45,11 @@ local function compose_llm_settings(pcfg)
cache_prefix = pcfg.cache_prefix or 'neural_llm',
cache_hash_len = pcfg.cache_hash_len or 32,
cache_use_hashing = (pcfg.cache_use_hashing ~= false),
-- Optional staged timeouts (inherit from global gpt if present)
connect_timeout = pcfg.connect_timeout or gpt_settings.connect_timeout,
ssl_timeout = pcfg.ssl_timeout or gpt_settings.ssl_timeout,
write_timeout = pcfg.write_timeout or gpt_settings.write_timeout,
read_timeout = pcfg.read_timeout or gpt_settings.read_timeout,
}
end
@ -182,6 +187,11 @@ neural_common.register_provider('llm', {
use_gzip = true,
keepalive = true,
callback = http_cb,
-- staged timeouts
connect_timeout = llm.connect_timeout,
ssl_timeout = llm.ssl_timeout,
write_timeout = llm.write_timeout,
read_timeout = llm.read_timeout,
}
rspamd_http.request(http_params)

64
src/libserver/maps/map.c

@ -1425,6 +1425,21 @@ rspamd_map_dns_callback(struct rdns_reply *reply, void *arg)
cbd->addr);
if (cbd->conn != NULL) {
/* Apply optional staged timeouts and keepalive tuning */
if (cbd->data->connect_timeout > 0 || cbd->data->ssl_timeout > 0 ||
cbd->data->write_timeout > 0 || cbd->data->read_timeout > 0) {
rspamd_http_connection_set_timeouts(cbd->conn,
cbd->data->connect_timeout,
cbd->data->ssl_timeout,
cbd->data->write_timeout,
cbd->data->read_timeout);
}
if (cbd->data->connection_ttl > 0 || cbd->data->idle_timeout > 0 || cbd->data->max_reuse > 0) {
rspamd_http_connection_set_keepalive_tuning(cbd->conn,
cbd->data->connection_ttl,
cbd->data->idle_timeout,
cbd->data->max_reuse);
}
write_http_request(cbd);
}
else {
@ -1982,7 +1997,21 @@ check:
addr);
if (cbd->conn != NULL) {
cbd->stage = http_map_http_conn;
/* Apply optional staged timeouts and keepalive tuning */
if (cbd->data->connect_timeout > 0 || cbd->data->ssl_timeout > 0 ||
cbd->data->write_timeout > 0 || cbd->data->read_timeout > 0) {
rspamd_http_connection_set_timeouts(cbd->conn,
cbd->data->connect_timeout,
cbd->data->ssl_timeout,
cbd->data->write_timeout,
cbd->data->read_timeout);
}
if (cbd->data->connection_ttl > 0 || cbd->data->idle_timeout > 0 || cbd->data->max_reuse > 0) {
rspamd_http_connection_set_keepalive_tuning(cbd->conn,
cbd->data->connection_ttl,
cbd->data->idle_timeout,
cbd->data->max_reuse);
}
write_http_request(cbd);
cbd->addr = addr;
MAP_RELEASE(cbd, "http_callback_data");
@ -2881,6 +2910,39 @@ rspamd_map_parse_backend(struct rspamd_config *cfg, const char *map_line)
}
}
/* Parse optional HTTP timeouts and keepalive tuning from global options -> maps.* block */
{
const ucl_object_t *maps_obj = ucl_object_lookup(cfg->cfg_ucl_obj, "maps");
const ucl_object_t *opt = NULL;
if (maps_obj && ucl_object_type(maps_obj) == UCL_OBJECT) {
/* Per-URL overrides: allow stanza keyed by exact URL */
const ucl_object_t *url_obj = ucl_object_lookup(maps_obj, bk->uri);
const ucl_object_t *src = url_obj ? url_obj : maps_obj;
opt = ucl_object_lookup_any(src,
"connect_timeout", "connect-timeout", NULL);
if (opt) hdata->connect_timeout = ucl_object_todouble(opt);
opt = ucl_object_lookup_any(src,
"ssl_timeout", "ssl-timeout", NULL);
if (opt) hdata->ssl_timeout = ucl_object_todouble(opt);
opt = ucl_object_lookup_any(src,
"write_timeout", "write-timeout", NULL);
if (opt) hdata->write_timeout = ucl_object_todouble(opt);
opt = ucl_object_lookup_any(src,
"read_timeout", "read-timeout", NULL);
if (opt) hdata->read_timeout = ucl_object_todouble(opt);
/* Keepalive tuning */
opt = ucl_object_lookup_any(src,
"connection_ttl", "connection-ttl", "keepalive_ttl", NULL);
if (opt) hdata->connection_ttl = ucl_object_todouble(opt);
opt = ucl_object_lookup_any(src,
"idle_timeout", "idle-timeout", "keepalive_idle", NULL);
if (opt) hdata->idle_timeout = ucl_object_todouble(opt);
opt = ucl_object_lookup_any(src,
"max_reuse", "max-reuse", "keepalive_max_reuse", NULL);
if (opt) hdata->max_reuse = (unsigned int) ucl_object_toint(opt);
}
}
hdata->cache = rspamd_mempool_alloc0_shared(cfg->cfg_pool,
sizeof(*hdata->cache));

9
src/libserver/maps/map_private.h

@ -118,6 +118,15 @@ struct http_map_data {
gboolean request_sent;
uint64_t gen;
uint16_t port;
/* Optional per-map HTTP staged timeouts */
ev_tstamp connect_timeout;
ev_tstamp ssl_timeout;
ev_tstamp write_timeout;
ev_tstamp read_timeout;
/* Optional keepalive tuning */
double connection_ttl;
double idle_timeout;
unsigned int max_reuse;
};
struct static_map_data {

30
src/plugins/lua/bimi.lua

@ -42,6 +42,11 @@ local settings_schema = lua_redis.enrich_schema({
redis_min_expiry = ts.number + ts.string / lua_util.parse_time_interval,
redis_prefix = ts.string,
enabled = ts.boolean:is_optional(),
-- New optional staged timeouts for HTTP helper
helper_connect_timeout = (ts.number + ts.string / lua_util.parse_time_interval):is_optional(),
helper_ssl_timeout = (ts.number + ts.string / lua_util.parse_time_interval):is_optional(),
helper_write_timeout = (ts.number + ts.string / lua_util.parse_time_interval):is_optional(),
helper_read_timeout = (ts.number + ts.string / lua_util.parse_time_interval):is_optional(),
})
local function check_dmarc_policy(task)
@ -189,11 +194,11 @@ local function make_helper_request(task, domain, record, redis_server)
end
ret, _, upstream = lua_redis.redis_make_request(task,
redis_params, -- connect params
redis_key, -- hash key
true, -- is write
redis_set_cb, --callback
'PSETEX', -- command
redis_params, -- connect params
redis_key, -- hash key
true, -- is write
redis_set_cb, --callback
'PSETEX', -- command
{ redis_key, tostring(settings.redis_min_expiry * 1000.0),
ucl.to_format(d, "json-compact") })
@ -235,6 +240,11 @@ local function make_helper_request(task, domain, record, redis_server)
url = helper_url,
callback = http_helper_callback,
keepalive = true,
-- staged timeouts if configured
connect_timeout = settings.helper_connect_timeout,
ssl_timeout = settings.helper_ssl_timeout,
write_timeout = settings.helper_write_timeout,
read_timeout = settings.helper_read_timeout,
})
end
@ -282,11 +292,11 @@ local function check_bimi_vmc(task, domain, record)
-- We first check Redis and then try to use helper
ret, _, upstream = lua_redis.redis_make_request(task,
redis_params, -- connect params
redis_key, -- hash key
false, -- is write
redis_cached_cb, --callback
'GET', -- command
redis_params, -- connect params
redis_key, -- hash key
false, -- is write
redis_cached_cb, --callback
'GET', -- command
{ redis_key })
if not ret then

90
src/plugins/lua/contextal.lua

@ -12,7 +12,7 @@ distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
]]--
]] --
local E = {}
local N = 'contextal'
@ -58,6 +58,11 @@ local config_schema = lua_redis.enrich_schema {
http_timeout = ts.number:is_optional(),
request_ttl = ts.number:is_optional(),
submission_symbol = ts.string:is_optional(),
-- staged timeouts
connect_timeout = ts.number:is_optional(),
ssl_timeout = ts.number:is_optional(),
write_timeout = ts.number:is_optional(),
read_timeout = ts.number:is_optional(),
}
local settings = {
@ -104,11 +109,11 @@ local function process_actions(task, obj, is_cached)
local cache_obj
if (obj[1] or E).actions then
cache_obj = {[1] = {["actions"] = obj[1].actions}}
cache_obj = { [1] = { ["actions"] = obj[1].actions } }
else
local work_id = task:get_mempool():get_variable('contextal_work_id', 'string')
if work_id then
cache_obj = {[1] = {["work_id"] = work_id}}
cache_obj = { [1] = { ["work_id"] = work_id } }
else
rspamd_logger.err(task, 'no work id found in mempool')
return
@ -116,9 +121,9 @@ local function process_actions(task, obj, is_cached)
end
redis_cache.cache_set(task,
task:get_digest(),
cache_obj,
cache_context)
task:get_digest(),
cache_obj,
cache_context)
maybe_defer(task, obj)
end
@ -167,18 +172,22 @@ local function action_cb(task)
end
rspamd_http.request({
task = task,
url = settings.actions_url .. work_id,
callback = http_callback,
timeout = settings.http_timeout,
gzip = settings.gzip,
keepalive = settings.keepalive,
no_ssl_verify = settings.no_ssl_verify,
task = task,
url = settings.actions_url .. work_id,
callback = http_callback,
timeout = settings.http_timeout,
gzip = settings.gzip,
keepalive = settings.keepalive,
no_ssl_verify = settings.no_ssl_verify,
-- staged timeouts
connect_timeout = settings.connect_timeout,
ssl_timeout = settings.ssl_timeout,
write_timeout = settings.write_timeout,
read_timeout = settings.read_timeout,
})
end
local function submit(task)
local function http_callback(err, code, body, hdrs)
if err then
rspamd_logger.err(task, 'http error: %s', err)
@ -203,33 +212,38 @@ local function submit(task)
task:get_mempool():set_variable('contextal_work_id', work_id)
end
task:insert_result(settings.submission_symbol, 1.0,
string.format('work_id=%s', work_id or 'nil'))
string.format('work_id=%s', work_id or 'nil'))
if wait_request_ttl then
task:add_timer(settings.request_ttl, action_cb)
end
end
local req = {
object_data = {['data'] = task:get_content()},
object_data = { ['data'] = task:get_content() },
}
if settings.request_ttl then
req.ttl = {['data'] = tostring(settings.request_ttl)}
req.ttl = { ['data'] = tostring(settings.request_ttl) }
end
if settings.max_recursion then
req.maxrec = {['data'] = tostring(settings.max_recursion)}
req.maxrec = { ['data'] = tostring(settings.max_recursion) }
end
rspamd_http.request({
task = task,
url = settings.submit_url,
body = lua_util.table_to_multipart_body(req, static_boundary),
callback = http_callback,
headers = {
['Content-Type'] = string.format('multipart/form-data; boundary="%s"', static_boundary)
},
timeout = settings.http_timeout,
gzip = settings.gzip,
keepalive = settings.keepalive,
no_ssl_verify = settings.no_ssl_verify,
task = task,
url = settings.submit_url,
body = lua_util.table_to_multipart_body(req, static_boundary),
callback = http_callback,
headers = {
['Content-Type'] = string.format('multipart/form-data; boundary="%s"', static_boundary)
},
timeout = settings.http_timeout,
gzip = settings.gzip,
keepalive = settings.keepalive,
no_ssl_verify = settings.no_ssl_verify,
-- staged timeouts
connect_timeout = settings.connect_timeout,
ssl_timeout = settings.ssl_timeout,
write_timeout = settings.write_timeout,
read_timeout = settings.read_timeout,
})
end
@ -244,11 +258,11 @@ end
local function submit_cb(task)
if cache_context then
redis_cache.cache_get(task,
task:get_digest(),
cache_context,
settings.cache_timeout,
submit,
cache_hit
task:get_digest(),
cache_context,
settings.cache_timeout,
submit,
cache_hit
)
else
submit(task)
@ -293,10 +307,10 @@ end
redis_params = lua_redis.parse_redis_server(N)
if redis_params then
cache_context = redis_cache.create_cache_context(redis_params, {
cache_prefix = settings.cache_prefix,
cache_ttl = settings.cache_ttl,
cache_format = 'json',
cache_use_hashing = false
cache_prefix = settings.cache_prefix,
cache_ttl = settings.cache_ttl,
cache_format = 'json',
cache_use_hashing = false
})
end

15
src/plugins/lua/gpt.lua

@ -131,6 +131,11 @@ local settings = {
}
},
timeout = 10,
-- Optional staged timeouts
connect_timeout = nil,
ssl_timeout = nil,
write_timeout = nil,
read_timeout = nil,
prompt = nil,
condition = nil,
autolearn = false,
@ -744,6 +749,11 @@ local function openai_check(task, content, sel_part)
task = task,
upstream = upstream,
use_gzip = true,
-- staged timeouts
connect_timeout = settings.connect_timeout,
ssl_timeout = settings.ssl_timeout,
write_timeout = settings.write_timeout,
read_timeout = settings.read_timeout,
}
if not rspamd_http.request(http_params) then
@ -846,6 +856,11 @@ local function ollama_check(task, content, sel_part)
task = task,
upstream = upstream,
use_gzip = true,
-- staged timeouts
connect_timeout = settings.connect_timeout,
ssl_timeout = settings.ssl_timeout,
write_timeout = settings.write_timeout,
read_timeout = settings.read_timeout,
}
rspamd_http.request(http_params)

25
src/plugins/lua/metadata_exporter.lua

@ -13,7 +13,7 @@ distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
]]--
]] --
if confighelp then
return
@ -164,8 +164,8 @@ local function get_general_metadata(task, flatten, no_content)
scan_real = math.floor(scan_real * 1000)
if scan_real < 0 then
rspamd_logger.messagex(task,
'clock skew detected for message: %s ms real sca time (reset to 0)',
scan_real)
'clock skew detected for message: %s ms real sca time (reset to 0)',
scan_real)
scan_real = 0
end
@ -286,18 +286,18 @@ local pushers = {
local function redis_pub_cb(err)
if err then
rspamd_logger.errx(task, 'got error %s when publishing on server %s',
err, upstream:get_addr())
err, upstream:get_addr())
return maybe_defer(task, rule)
end
return true
end
ret, _, upstream = lua_redis.redis_make_request(task,
redis_params, -- connect params
nil, -- hash key
true, -- is write
redis_pub_cb, --callback
'PUBLISH', -- command
{ rule.channel, formatted } -- arguments
redis_params, -- connect params
nil, -- hash key
true, -- is write
redis_pub_cb, --callback
'PUBLISH', -- command
{ rule.channel, formatted } -- arguments
)
if not ret then
rspamd_logger.errx(task, 'error connecting to redis')
@ -346,6 +346,11 @@ local pushers = {
gzip = rule.gzip or settings.gzip,
keepalive = rule.keepalive or settings.keepalive,
no_ssl_verify = rule.no_ssl_verify or settings.no_ssl_verify,
-- staged timeouts
connect_timeout = rule.connect_timeout or settings.connect_timeout,
ssl_timeout = rule.ssl_timeout or settings.ssl_timeout,
write_timeout = rule.write_timeout or settings.write_timeout,
read_timeout = rule.read_timeout or settings.read_timeout,
})
end,
send_mail = function(task, formatted, rule, extra)

Loading…
Cancel
Save