|
|
@ -99,6 +99,8 @@ local redis_attrs = { |
|
|
log_obj = rspamd_config, |
|
|
log_obj = rspamd_config, |
|
|
resolver = rspamadm_dns_resolver, |
|
|
resolver = rspamadm_dns_resolver, |
|
|
} |
|
|
} |
|
|
|
|
|
local redis_attrs_write = lua_util.shallowcopy(redis_attrs) |
|
|
|
|
|
redis_attrs_write['is_write'] = true |
|
|
local pool |
|
|
local pool |
|
|
|
|
|
|
|
|
local function load_config(opts) |
|
|
local function load_config(opts) |
|
|
@ -481,7 +483,7 @@ local function prepare_report(opts, start_time, end_time, rep_key) |
|
|
|
|
|
|
|
|
-- Rename report key to avoid races |
|
|
-- Rename report key to avoid races |
|
|
if not opts.no_opt then |
|
|
if not opts.no_opt then |
|
|
lua_redis.request(redis_params, redis_attrs, |
|
|
|
|
|
|
|
|
lua_redis.request(redis_params, redis_attrs_write, |
|
|
{ 'RENAME', rep_key, rep_key .. '_processing' }) |
|
|
{ 'RENAME', rep_key, rep_key .. '_processing' }) |
|
|
rep_key = rep_key .. '_processing' |
|
|
rep_key = rep_key .. '_processing' |
|
|
end |
|
|
end |
|
|
@ -491,7 +493,7 @@ local function prepare_report(opts, start_time, end_time, rep_key) |
|
|
|
|
|
|
|
|
if not dmarc_record then |
|
|
if not dmarc_record then |
|
|
if not opts.no_opt then |
|
|
if not opts.no_opt then |
|
|
lua_redis.request(redis_params, redis_attrs, |
|
|
|
|
|
|
|
|
lua_redis.request(redis_params, redis_attrs_write, |
|
|
{ 'DEL', rep_key }) |
|
|
{ 'DEL', rep_key }) |
|
|
end |
|
|
end |
|
|
logger.messagex('Cannot process reports for domain %s; invalid dmarc record', reporting_domain) |
|
|
logger.messagex('Cannot process reports for domain %s; invalid dmarc record', reporting_domain) |
|
|
@ -554,7 +556,7 @@ local function prepare_report(opts, start_time, end_time, rep_key) |
|
|
lua_util.debugm(N, 'got final message: %s', message) |
|
|
lua_util.debugm(N, 'got final message: %s', message) |
|
|
|
|
|
|
|
|
if not opts.no_opt then |
|
|
if not opts.no_opt then |
|
|
lua_redis.request(redis_params, redis_attrs, |
|
|
|
|
|
|
|
|
lua_redis.request(redis_params, redis_attrs_write, |
|
|
{ 'DEL', rep_key }) |
|
|
{ 'DEL', rep_key }) |
|
|
end |
|
|
end |
|
|
|
|
|
|
|
|
@ -585,7 +587,7 @@ local function process_report_date(opts, start_time, end_time, date) |
|
|
|
|
|
|
|
|
-- Rename index key to avoid races |
|
|
-- Rename index key to avoid races |
|
|
if not opts.no_opt then |
|
|
if not opts.no_opt then |
|
|
lua_redis.request(redis_params, redis_attrs, |
|
|
|
|
|
|
|
|
lua_redis.request(redis_params, redis_attrs_write, |
|
|
{ 'RENAME', idx_key, idx_key .. '_processing' }) |
|
|
{ 'RENAME', idx_key, idx_key .. '_processing' }) |
|
|
idx_key = idx_key .. '_processing' |
|
|
idx_key = idx_key .. '_processing' |
|
|
end |
|
|
end |
|
|
@ -595,7 +597,7 @@ local function process_report_date(opts, start_time, end_time, date) |
|
|
if not ret or not results then |
|
|
if not ret or not results then |
|
|
-- Remove bad key |
|
|
-- Remove bad key |
|
|
if not opts.no_opt then |
|
|
if not opts.no_opt then |
|
|
lua_redis.request(redis_params, redis_attrs, |
|
|
|
|
|
|
|
|
lua_redis.request(redis_params, redis_attrs_write, |
|
|
{ 'DEL', idx_key }) |
|
|
{ 'DEL', idx_key }) |
|
|
end |
|
|
end |
|
|
logger.messagex('Cannot get reports for %s', date) |
|
|
logger.messagex('Cannot get reports for %s', date) |
|
|
@ -615,7 +617,7 @@ local function process_report_date(opts, start_time, end_time, date) |
|
|
lua_util.shuffle(reports) |
|
|
lua_util.shuffle(reports) |
|
|
-- Remove processed key |
|
|
-- Remove processed key |
|
|
if not opts.no_opt then |
|
|
if not opts.no_opt then |
|
|
lua_redis.request(redis_params, redis_attrs, |
|
|
|
|
|
|
|
|
lua_redis.request(redis_params, redis_attrs_write, |
|
|
{ 'DEL', idx_key }) |
|
|
{ 'DEL', idx_key }) |
|
|
end |
|
|
end |
|
|
|
|
|
|
|
|
@ -715,11 +717,11 @@ local function handler(args) |
|
|
if not opts.no_opt then |
|
|
if not opts.no_opt then |
|
|
lua_util.debugm(N, 'set last report date to %s', start_collection) |
|
|
lua_util.debugm(N, 'set last report date to %s', start_collection) |
|
|
-- Hack to avoid coroutines + async functions mess: we use async redis call here |
|
|
-- Hack to avoid coroutines + async functions mess: we use async redis call here |
|
|
redis_attrs.callback = function() |
|
|
|
|
|
|
|
|
redis_attrs_write.callback = function() |
|
|
logger.messagex('Reporting collection has finished %s dates processed, %s reports: %s completed, %s failed', |
|
|
logger.messagex('Reporting collection has finished %s dates processed, %s reports: %s completed, %s failed', |
|
|
ndates, nreports, nsuccess, nfail) |
|
|
ndates, nreports, nsuccess, nfail) |
|
|
end |
|
|
end |
|
|
lua_redis.request(redis_params, redis_attrs, |
|
|
|
|
|
|
|
|
lua_redis.request(redis_params, redis_attrs_write, |
|
|
{ 'SETEX', 'rspamd_dmarc_last_collection', dmarc_settings.reporting.keys_expire * 2, |
|
|
{ 'SETEX', 'rspamd_dmarc_last_collection', dmarc_settings.reporting.keys_expire * 2, |
|
|
tostring(start_collection) }) |
|
|
tostring(start_collection) }) |
|
|
else |
|
|
else |
|
|
|