|
|
@ -86,7 +86,7 @@ local default_opts = { |
|
|
|
cache_probes = 5, -- Number of times to check a pending key |
|
|
|
cache_format = "json", -- Serialization format |
|
|
|
cache_hash_len = 16, -- Number of hex symbols to use for hashed keys |
|
|
|
cache_use_hashing = true -- Whether to hash keys by default |
|
|
|
cache_use_hashing = false -- Whether to hash keys by default |
|
|
|
} |
|
|
|
|
|
|
|
-- Create a hash of the key using the configured length |
|
|
@ -119,7 +119,7 @@ local function get_cache_key(raw_key, cache_context, force_hashing) |
|
|
|
end |
|
|
|
|
|
|
|
-- Create a caching context with the provided options |
|
|
|
local function create_cache_context(redis_params, opts) |
|
|
|
local function create_cache_context(redis_params, opts, module_name) |
|
|
|
if not redis_params then |
|
|
|
return nil, "Redis parameters must be provided" |
|
|
|
end |
|
|
@ -129,6 +129,7 @@ local function create_cache_context(redis_params, opts) |
|
|
|
|
|
|
|
-- Process and merge configuration options |
|
|
|
cache_context.opts = lua_util.override_defaults(default_opts, opts) |
|
|
|
cache_context.N = module_name or N |
|
|
|
|
|
|
|
-- Register Redis prefix |
|
|
|
lua_redis.register_prefix(cache_context.opts.cache_prefix, |
|
|
@ -150,7 +151,7 @@ local function create_cache_context(redis_params, opts) |
|
|
|
|
|
|
|
-- Set serialization and deserialization functions |
|
|
|
if cache_context.opts.cache_format == "messagepack" then |
|
|
|
lua_util.debugm(N, rspamd_config, "using messagepack for serialization") |
|
|
|
lua_util.debugm(cache_context.N, rspamd_config, "using messagepack for serialization") |
|
|
|
|
|
|
|
cache_context.encode = function(data) |
|
|
|
return ucl.to_format(data, 'msgpack') |
|
|
@ -160,14 +161,14 @@ local function create_cache_context(redis_params, opts) |
|
|
|
local ucl_parser = ucl.parser() |
|
|
|
local ok, ucl_err = ucl_parser:parse_text(raw_data, 'messagepack') |
|
|
|
if not ok then |
|
|
|
lua_util.debugm(N, rspamd_config, "failed to parse messagepack data: %s", ucl_err) |
|
|
|
lua_util.debugm(cache_context.N, rspamd_config, "failed to parse messagepack data: %s", ucl_err) |
|
|
|
return nil |
|
|
|
end |
|
|
|
return ucl_parser:get_object() |
|
|
|
end |
|
|
|
else |
|
|
|
-- Default to JSON |
|
|
|
lua_util.debugm(N, rspamd_config, "using json for serialization") |
|
|
|
lua_util.debugm(cache_context.N, rspamd_config, "using json for serialization") |
|
|
|
|
|
|
|
cache_context.encode = function(data) |
|
|
|
return ucl.to_format(data, 'json') |
|
|
@ -177,47 +178,47 @@ local function create_cache_context(redis_params, opts) |
|
|
|
local ucl_parser = ucl.parser() |
|
|
|
local ok, ucl_err = ucl_parser:parse_text(raw_data) |
|
|
|
if not ok then |
|
|
|
lua_util.debugm(N, rspamd_config, "failed to parse json data: %s", ucl_err) |
|
|
|
lua_util.debugm(cache_context.N, rspamd_config, "failed to parse json data: %s", ucl_err) |
|
|
|
return nil |
|
|
|
end |
|
|
|
return ucl_parser:get_object() |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
lua_util.debugm(N, rspamd_config, "cache context created: %s", cache_context.opts) |
|
|
|
lua_util.debugm(cache_context.N, rspamd_config, "cache context created: %s", cache_context.opts) |
|
|
|
return cache_context |
|
|
|
end |
|
|
|
|
|
|
|
-- Encode data for storage in Redis with proper formatting |
|
|
|
local function encode_data(data, cache_context) |
|
|
|
lua_util.debugm(N, rspamd_config, "encoding data using %s format", cache_context.opts.cache_format) |
|
|
|
lua_util.debugm(cache_context.N, rspamd_config, "encoding data using %s format", cache_context.opts.cache_format) |
|
|
|
return cache_context.encode(data) |
|
|
|
end |
|
|
|
|
|
|
|
-- Decode data from Redis with proper formatting |
|
|
|
local function decode_data(data, cache_context) |
|
|
|
if not data then |
|
|
|
lua_util.debugm(N, rspamd_config, "cannot decode nil data") |
|
|
|
lua_util.debugm(cache_context.N, rspamd_config, "cannot decode nil data") |
|
|
|
return nil |
|
|
|
end |
|
|
|
lua_util.debugm(N, rspamd_config, "decoding data using %s format", cache_context.opts.cache_format) |
|
|
|
lua_util.debugm(cache_context.N, rspamd_config, "decoding data using %s format", cache_context.opts.cache_format) |
|
|
|
return cache_context.decode(data) |
|
|
|
end |
|
|
|
|
|
|
|
-- Check if a value is a PENDING marker and extract its details |
|
|
|
local function parse_pending_value(value, cache_context) |
|
|
|
if type(value) ~= 'string' then |
|
|
|
lua_util.debugm(N, rspamd_config, "value is not a string, cannot be a pending marker") |
|
|
|
lua_util.debugm(cache_context.N, rspamd_config, "value is not a string, cannot be a pending marker") |
|
|
|
return nil |
|
|
|
end |
|
|
|
|
|
|
|
-- Check if the value starts with PENDING: |
|
|
|
if string.sub(value, 1, 8) ~= "PENDING:" then |
|
|
|
lua_util.debugm(N, rspamd_config, "value doesn't start with PENDING: prefix") |
|
|
|
lua_util.debugm(cache_context.N, rspamd_config, "value doesn't start with PENDING: prefix") |
|
|
|
return nil |
|
|
|
end |
|
|
|
|
|
|
|
lua_util.debugm(N, rspamd_config, "found PENDING marker, extracting data") |
|
|
|
lua_util.debugm(cache_context.N, rspamd_config, "found PENDING marker, extracting data") |
|
|
|
local pending_data = string.sub(value, 9) |
|
|
|
return decode_data(pending_data, cache_context) |
|
|
|
end |
|
|
@ -231,7 +232,7 @@ local function create_pending_marker(timeout, cache_context) |
|
|
|
timestamp = os.time() |
|
|
|
} |
|
|
|
|
|
|
|
lua_util.debugm(N, rspamd_config, "creating PENDING marker for host %s, timeout %s", |
|
|
|
lua_util.debugm(cache_context.N, rspamd_config, "creating PENDING marker for host %s, timeout %s", |
|
|
|
hostname, timeout) |
|
|
|
|
|
|
|
return "PENDING:" .. encode_data(pending_data, cache_context) |
|
|
@ -245,52 +246,52 @@ local function cache_get(task, key, cache_context, timeout, callback_uncached, c |
|
|
|
end |
|
|
|
|
|
|
|
local full_key = cache_context.opts.cache_prefix .. "_" .. get_cache_key(key, cache_context, false) |
|
|
|
lua_util.debugm(N, task, "cache lookup for key: %s (%s)", key, full_key) |
|
|
|
lua_util.debugm(cache_context.N, task, "cache lookup for key: %s (%s)", key, full_key) |
|
|
|
|
|
|
|
-- Function to check a pending key |
|
|
|
local function check_pending(pending_info) |
|
|
|
local probe_count = 0 |
|
|
|
local probe_interval = timeout / (cache_context.opts.cache_probes or 5) |
|
|
|
|
|
|
|
lua_util.debugm(N, task, "setting up probes for pending key %s, interval: %s seconds", |
|
|
|
lua_util.debugm(cache_context.N, task, "setting up probes for pending key %s, interval: %s seconds", |
|
|
|
full_key, probe_interval) |
|
|
|
|
|
|
|
-- Set up a timer to probe the key |
|
|
|
local function probe_key() |
|
|
|
probe_count = probe_count + 1 |
|
|
|
lua_util.debugm(N, task, "probe #%s/%s for pending key %s", |
|
|
|
lua_util.debugm(cache_context.N, task, "probe #%s/%s for pending key %s", |
|
|
|
probe_count, cache_context.opts.cache_probes, full_key) |
|
|
|
|
|
|
|
if probe_count >= cache_context.opts.cache_probes then |
|
|
|
logger.infox(task, "maximum probes reached for key %s, considering it failed", full_key) |
|
|
|
lua_util.debugm(N, task, "maximum probes reached for key %s, giving up", full_key) |
|
|
|
lua_util.debugm(cache_context.N, task, "maximum probes reached for key %s, giving up", full_key) |
|
|
|
callback_data(task, "timeout waiting for pending key", nil) |
|
|
|
return |
|
|
|
end |
|
|
|
|
|
|
|
lua_util.debugm(N, task, "probing redis for key %s", full_key) |
|
|
|
lua_util.debugm(cache_context.N, task, "probing redis for key %s", full_key) |
|
|
|
lua_redis.redis_make_request(task, cache_context.redis_params, key, false, |
|
|
|
function(err, data) |
|
|
|
if err then |
|
|
|
logger.errx(task, "redis error while probing key %s: %s", full_key, err) |
|
|
|
lua_util.debugm(N, task, "redis error during probe: %s, retrying later", err) |
|
|
|
lua_util.debugm(cache_context.N, task, "redis error during probe: %s, retrying later", err) |
|
|
|
task:add_timer(probe_interval, probe_key) |
|
|
|
return |
|
|
|
end |
|
|
|
|
|
|
|
if not data or type(data) == 'userdata' then |
|
|
|
lua_util.debugm(N, task, "pending key %s disappeared, calling uncached handler", full_key) |
|
|
|
lua_util.debugm(cache_context.N, task, "pending key %s disappeared, calling uncached handler", full_key) |
|
|
|
callback_uncached(task) |
|
|
|
return |
|
|
|
end |
|
|
|
|
|
|
|
local pending = parse_pending_value(data, cache_context) |
|
|
|
if pending then |
|
|
|
lua_util.debugm(N, task, "key %s still pending (host: %s), retrying later", |
|
|
|
lua_util.debugm(cache_context.N, task, "key %s still pending (host: %s), retrying later", |
|
|
|
full_key, pending.hostname) |
|
|
|
task:add_timer(probe_interval, probe_key) |
|
|
|
else |
|
|
|
lua_util.debugm(N, task, "pending key %s resolved to actual data", full_key) |
|
|
|
lua_util.debugm(cache_context.N, task, "pending key %s resolved to actual data", full_key) |
|
|
|
callback_data(task, nil, decode_data(data, cache_context)) |
|
|
|
end |
|
|
|
end, |
|
|
@ -299,38 +300,38 @@ local function cache_get(task, key, cache_context, timeout, callback_uncached, c |
|
|
|
end |
|
|
|
|
|
|
|
-- Start the first probe after the initial probe interval |
|
|
|
lua_util.debugm(N, task, "scheduling first probe for %s in %s seconds", |
|
|
|
lua_util.debugm(cache_context.N, task, "scheduling first probe for %s in %s seconds", |
|
|
|
full_key, probe_interval) |
|
|
|
task:add_timer(probe_interval, probe_key) |
|
|
|
end |
|
|
|
|
|
|
|
-- Initial cache lookup |
|
|
|
lua_util.debugm(N, task, "making initial redis GET request for key: %s", full_key) |
|
|
|
lua_util.debugm(cache_context.N, task, "making initial redis GET request for key: %s", full_key) |
|
|
|
lua_redis.redis_make_request(task, cache_context.redis_params, key, false, |
|
|
|
function(err, data) |
|
|
|
if err then |
|
|
|
logger.errx(task, "redis error looking up key %s: %s", full_key, err) |
|
|
|
lua_util.debugm(N, task, "redis error: %s, calling uncached handler", err) |
|
|
|
lua_util.debugm(cache_context.N, task, "redis error: %s, calling uncached handler", err) |
|
|
|
callback_uncached(task) |
|
|
|
return |
|
|
|
end |
|
|
|
|
|
|
|
if not data or type(data) == 'userdata' then |
|
|
|
-- Key not found, set pending and call the uncached callback |
|
|
|
lua_util.debugm(N, task, "key %s not found in cache, creating pending marker", full_key) |
|
|
|
lua_util.debugm(cache_context.N, task, "key %s not found in cache, creating pending marker", full_key) |
|
|
|
local pending_marker = create_pending_marker(timeout, cache_context) |
|
|
|
|
|
|
|
lua_util.debugm(N, task, "setting pending marker for key %s with TTL %s", |
|
|
|
lua_util.debugm(cache_context.N, task, "setting pending marker for key %s with TTL %s", |
|
|
|
full_key, timeout * 2) |
|
|
|
lua_redis.redis_make_request(task, cache_context.redis_params, key, true, |
|
|
|
function(set_err, set_data) |
|
|
|
if set_err then |
|
|
|
logger.errx(task, "redis error setting pending marker for %s: %s", full_key, set_err) |
|
|
|
lua_util.debugm(N, task, "failed to set pending marker: %s", set_err) |
|
|
|
lua_util.debugm(cache_context.N, task, "failed to set pending marker: %s", set_err) |
|
|
|
else |
|
|
|
lua_util.debugm(N, task, "successfully set pending marker for %s", full_key) |
|
|
|
lua_util.debugm(cache_context.N, task, "successfully set pending marker for %s", full_key) |
|
|
|
end |
|
|
|
lua_util.debugm(N, task, "calling uncached handler for %s", full_key) |
|
|
|
lua_util.debugm(cache_context.N, task, "calling uncached handler for %s", full_key) |
|
|
|
callback_uncached(task) |
|
|
|
end, |
|
|
|
'SETEX', { full_key, tostring(timeout * 2), pending_marker } |
|
|
@ -341,26 +342,26 @@ local function cache_get(task, key, cache_context, timeout, callback_uncached, c |
|
|
|
|
|
|
|
if pending then |
|
|
|
-- Key is being processed by another worker |
|
|
|
lua_util.debugm(N, task, "key %s is pending on host %s, waiting for result", |
|
|
|
lua_util.debugm(cache_context.N, task, "key %s is pending on host %s, waiting for result", |
|
|
|
full_key, pending.hostname) |
|
|
|
check_pending(pending) |
|
|
|
else |
|
|
|
-- Extend TTL and return data |
|
|
|
lua_util.debugm(N, task, "found cached data for key %s, extending TTL to %s", |
|
|
|
lua_util.debugm(cache_context.N, task, "found cached data for key %s, extending TTL to %s", |
|
|
|
full_key, cache_context.opts.cache_ttl) |
|
|
|
lua_redis.redis_make_request(task, cache_context.redis_params, key, true, |
|
|
|
function(expire_err, _) |
|
|
|
if expire_err then |
|
|
|
logger.errx(task, "redis error extending TTL for %s: %s", full_key, expire_err) |
|
|
|
lua_util.debugm(N, task, "failed to extend TTL: %s", expire_err) |
|
|
|
lua_util.debugm(cache_context.N, task, "failed to extend TTL: %s", expire_err) |
|
|
|
else |
|
|
|
lua_util.debugm(N, task, "successfully extended TTL for %s", full_key) |
|
|
|
lua_util.debugm(cache_context.N, task, "successfully extended TTL for %s", full_key) |
|
|
|
end |
|
|
|
end, |
|
|
|
'EXPIRE', { full_key, tostring(cache_context.opts.cache_ttl) } |
|
|
|
) |
|
|
|
|
|
|
|
lua_util.debugm(N, task, "returning cached data for key %s", full_key) |
|
|
|
lua_util.debugm(cache_context.N, task, "returning cached data for key %s", full_key) |
|
|
|
callback_data(task, nil, decode_data(data, cache_context)) |
|
|
|
end |
|
|
|
end |
|
|
@ -379,20 +380,20 @@ local function cache_set(task, key, data, cache_context) |
|
|
|
end |
|
|
|
|
|
|
|
local full_key = cache_context.opts.cache_prefix .. "_" .. get_cache_key(key, cache_context, false) |
|
|
|
lua_util.debugm(N, task, "caching data for key: %s (%s) with TTL: %s", |
|
|
|
lua_util.debugm(cache_context.N, task, "caching data for key: %s (%s) with TTL: %s", |
|
|
|
full_key, key, cache_context.opts.cache_ttl) |
|
|
|
|
|
|
|
local encoded_data = encode_data(data, cache_context) |
|
|
|
|
|
|
|
-- Store the data with expiration |
|
|
|
lua_util.debugm(N, task, "making redis SETEX request for key: %s", full_key) |
|
|
|
lua_util.debugm(cache_context.N, task, "making redis SETEX request for key: %s", full_key) |
|
|
|
return lua_redis.redis_make_request(task, cache_context.redis_params, key, true, |
|
|
|
function(err, result) |
|
|
|
if err then |
|
|
|
logger.errx(task, "redis error setting cached data for %s: %s", full_key, err) |
|
|
|
lua_util.debugm(N, task, "failed to cache data: %s", err) |
|
|
|
lua_util.debugm(cache_context.N, task, "failed to cache data: %s", err) |
|
|
|
else |
|
|
|
lua_util.debugm(N, task, "successfully cached data for key %s", full_key) |
|
|
|
lua_util.debugm(cache_context.N, task, "successfully cached data for key %s", full_key) |
|
|
|
end |
|
|
|
end, |
|
|
|
'SETEX', { full_key, tostring(cache_context.opts.cache_ttl), encoded_data } |
|
|
@ -407,16 +408,16 @@ local function cache_del(task, key, cache_context) |
|
|
|
end |
|
|
|
|
|
|
|
local full_key = cache_context.opts.cache_prefix .. "_" .. get_cache_key(key, cache_context, false) |
|
|
|
lua_util.debugm(N, task, "deleting cache key: %s", full_key) |
|
|
|
lua_util.debugm(cache_context.N, task, "deleting cache key: %s", full_key) |
|
|
|
|
|
|
|
return lua_redis.redis_make_request(task, cache_context.redis_params, key, true, |
|
|
|
function(err, result) |
|
|
|
if err then |
|
|
|
logger.errx(task, "redis error deleting cache key %s: %s", full_key, err) |
|
|
|
lua_util.debugm(N, task, "failed to delete cache key: %s", err) |
|
|
|
lua_util.debugm(cache_context.N, task, "failed to delete cache key: %s", err) |
|
|
|
else |
|
|
|
local count = tonumber(result) or 0 |
|
|
|
lua_util.debugm(N, task, "successfully deleted cache key %s (%s keys removed)", |
|
|
|
lua_util.debugm(cache_context.N, task, "successfully deleted cache key %s (%s keys removed)", |
|
|
|
full_key, count) |
|
|
|
end |
|
|
|
end, |
|
|
@ -426,7 +427,7 @@ end |
|
|
|
|
|
|
|
-- Export the API functions |
|
|
|
---[[[ |
|
|
|
-- @function lua_cache.create_cache_context(redis_params, opts) |
|
|
|
-- @function lua_cache.create_cache_context(redis_params, opts, module_name) |
|
|
|
-- Creates a Redis caching context with specified parameters and options |
|
|
|
-- @param {table} redis_params Redis connection parameters (required) |
|
|
|
-- @param {table} opts Optional configuration parameters: |
|
|
|