|
|
|
@ -85,6 +85,12 @@ worker_t rspamd_proxy_worker = { |
|
|
|
RSPAMD_WORKER_SOCKET_TCP, /* TCP socket */ |
|
|
|
RSPAMD_WORKER_VER}; |
|
|
|
|
|
|
|
enum rspamd_proxy_log_tag_type { |
|
|
|
RSPAMD_PROXY_LOG_TAG_SESSION = 0, /* Use session mempool tag (default) */ |
|
|
|
RSPAMD_PROXY_LOG_TAG_QUEUE_ID, /* Use Queue-ID from client message */ |
|
|
|
RSPAMD_PROXY_LOG_TAG_NONE, /* Skip log tag passing */ |
|
|
|
}; |
|
|
|
|
|
|
|
struct rspamd_http_upstream { |
|
|
|
char *name; |
|
|
|
char *settings_id; |
|
|
|
@ -98,6 +104,7 @@ struct rspamd_http_upstream { |
|
|
|
gboolean compress; |
|
|
|
gboolean ssl; |
|
|
|
gboolean keepalive; /* Whether to use keepalive for this upstream */ |
|
|
|
enum rspamd_proxy_log_tag_type log_tag_type; |
|
|
|
ucl_object_t *extra_headers; |
|
|
|
}; |
|
|
|
|
|
|
|
@ -114,6 +121,7 @@ struct rspamd_http_mirror { |
|
|
|
gboolean compress; |
|
|
|
gboolean ssl; |
|
|
|
gboolean keepalive; /* Whether to use keepalive for this mirror */ |
|
|
|
enum rspamd_proxy_log_tag_type log_tag_type; |
|
|
|
ucl_object_t *extra_headers; |
|
|
|
}; |
|
|
|
|
|
|
|
@ -167,6 +175,8 @@ struct rspamd_proxy_ctx { |
|
|
|
/* Language detector */ |
|
|
|
struct rspamd_lang_detector *lang_det; |
|
|
|
double task_timeout; |
|
|
|
/* Default log tag type for worker */ |
|
|
|
enum rspamd_proxy_log_tag_type log_tag_type; |
|
|
|
struct rspamd_main *srv; |
|
|
|
}; |
|
|
|
|
|
|
|
@ -231,6 +241,77 @@ rspamd_proxy_quark(void) |
|
|
|
return g_quark_from_static_string("rspamd-proxy"); |
|
|
|
} |
|
|
|
|
|
|
|
static enum rspamd_proxy_log_tag_type |
|
|
|
rspamd_proxy_parse_log_tag_type(const char *str) |
|
|
|
{ |
|
|
|
if (str == NULL) { |
|
|
|
return RSPAMD_PROXY_LOG_TAG_SESSION; |
|
|
|
} |
|
|
|
|
|
|
|
if (g_ascii_strcasecmp(str, "session") == 0 || |
|
|
|
g_ascii_strcasecmp(str, "session_tag") == 0) { |
|
|
|
return RSPAMD_PROXY_LOG_TAG_SESSION; |
|
|
|
} |
|
|
|
else if (g_ascii_strcasecmp(str, "queue_id") == 0 || |
|
|
|
g_ascii_strcasecmp(str, "queue-id") == 0) { |
|
|
|
return RSPAMD_PROXY_LOG_TAG_QUEUE_ID; |
|
|
|
} |
|
|
|
else if (g_ascii_strcasecmp(str, "none") == 0 || |
|
|
|
g_ascii_strcasecmp(str, "skip") == 0) { |
|
|
|
return RSPAMD_PROXY_LOG_TAG_NONE; |
|
|
|
} |
|
|
|
|
|
|
|
/* Default to session tag for unknown values */ |
|
|
|
return RSPAMD_PROXY_LOG_TAG_SESSION; |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
|
rspamd_proxy_add_log_tag_header(struct rspamd_http_message *msg, |
|
|
|
struct rspamd_proxy_session *session, |
|
|
|
enum rspamd_proxy_log_tag_type log_tag_type) |
|
|
|
{ |
|
|
|
const rspamd_ftok_t *queue_id_hdr; |
|
|
|
|
|
|
|
switch (log_tag_type) { |
|
|
|
case RSPAMD_PROXY_LOG_TAG_SESSION: |
|
|
|
/* Use session mempool tag (current behavior) */ |
|
|
|
rspamd_http_message_add_header_len(msg, LOG_TAG_HEADER, session->pool->tag.uid, |
|
|
|
strnlen(session->pool->tag.uid, sizeof(session->pool->tag.uid))); |
|
|
|
break; |
|
|
|
|
|
|
|
case RSPAMD_PROXY_LOG_TAG_QUEUE_ID: |
|
|
|
/* Try to extract Queue-ID from client message */ |
|
|
|
if (session->client_message) { |
|
|
|
queue_id_hdr = rspamd_http_message_find_header(session->client_message, QUEUE_ID_HEADER); |
|
|
|
if (queue_id_hdr) { |
|
|
|
rspamd_http_message_add_header_len(msg, LOG_TAG_HEADER, |
|
|
|
queue_id_hdr->begin, queue_id_hdr->len); |
|
|
|
} |
|
|
|
/* If no Queue-ID found, fall back to session tag */ |
|
|
|
else { |
|
|
|
rspamd_http_message_add_header_len(msg, LOG_TAG_HEADER, session->pool->tag.uid, |
|
|
|
strnlen(session->pool->tag.uid, sizeof(session->pool->tag.uid))); |
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
/* No client message, fall back to session tag */ |
|
|
|
rspamd_http_message_add_header_len(msg, LOG_TAG_HEADER, session->pool->tag.uid, |
|
|
|
strnlen(session->pool->tag.uid, sizeof(session->pool->tag.uid))); |
|
|
|
} |
|
|
|
break; |
|
|
|
|
|
|
|
case RSPAMD_PROXY_LOG_TAG_NONE: |
|
|
|
/* Skip adding log tag header */ |
|
|
|
break; |
|
|
|
|
|
|
|
default: |
|
|
|
/* Fall back to session tag for unknown types */ |
|
|
|
rspamd_http_message_add_header_len(msg, LOG_TAG_HEADER, session->pool->tag.uid, |
|
|
|
strnlen(session->pool->tag.uid, sizeof(session->pool->tag.uid))); |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
static gboolean |
|
|
|
rspamd_proxy_parse_lua_parser(lua_State *L, const ucl_object_t *obj, |
|
|
|
int *ref_from, int *ref_to, GError **err) |
|
|
|
@ -399,6 +480,7 @@ rspamd_proxy_parse_upstream(rspamd_mempool_t *pool, |
|
|
|
up->parser_from_ref = -1; |
|
|
|
up->parser_to_ref = -1; |
|
|
|
up->timeout = ctx->timeout; |
|
|
|
up->log_tag_type = ctx->log_tag_type; /* Inherit from worker default */ |
|
|
|
|
|
|
|
elt = ucl_object_lookup(obj, "key"); |
|
|
|
if (elt != NULL) { |
|
|
|
@ -507,6 +589,11 @@ rspamd_proxy_parse_upstream(rspamd_mempool_t *pool, |
|
|
|
up->extra_headers); |
|
|
|
} |
|
|
|
|
|
|
|
elt = ucl_object_lookup_any(obj, "log_tag", "log_tag_type", NULL); |
|
|
|
if (elt && ucl_object_type(elt) == UCL_STRING) { |
|
|
|
up->log_tag_type = rspamd_proxy_parse_log_tag_type(ucl_object_tostring(elt)); |
|
|
|
} |
|
|
|
|
|
|
|
/* |
|
|
|
* Accept lua function here in form |
|
|
|
* fun :: String -> UCL |
|
|
|
@ -606,6 +693,7 @@ rspamd_proxy_parse_mirror(rspamd_mempool_t *pool, |
|
|
|
up->parser_to_ref = -1; |
|
|
|
up->parser_from_ref = -1; |
|
|
|
up->timeout = ctx->timeout; |
|
|
|
up->log_tag_type = ctx->log_tag_type; /* Inherit from worker default */ |
|
|
|
|
|
|
|
elt = ucl_object_lookup(obj, "key"); |
|
|
|
if (elt != NULL) { |
|
|
|
@ -686,6 +774,11 @@ rspamd_proxy_parse_mirror(rspamd_mempool_t *pool, |
|
|
|
up->settings_id = rspamd_mempool_strdup(pool, ucl_object_tostring(elt)); |
|
|
|
} |
|
|
|
|
|
|
|
elt = ucl_object_lookup_any(obj, "log_tag", "log_tag_type", NULL); |
|
|
|
if (elt && ucl_object_type(elt) == UCL_STRING) { |
|
|
|
up->log_tag_type = rspamd_proxy_parse_log_tag_type(ucl_object_tostring(elt)); |
|
|
|
} |
|
|
|
|
|
|
|
g_ptr_array_add(ctx->mirrors, up); |
|
|
|
|
|
|
|
return TRUE; |
|
|
|
@ -785,6 +878,29 @@ err: |
|
|
|
return FALSE; |
|
|
|
} |
|
|
|
|
|
|
|
static gboolean |
|
|
|
rspamd_proxy_parse_log_tag_worker_option(rspamd_mempool_t *pool, |
|
|
|
const ucl_object_t *obj, |
|
|
|
gpointer ud, |
|
|
|
struct rspamd_rcl_section *section, |
|
|
|
GError **err) |
|
|
|
{ |
|
|
|
struct rspamd_proxy_ctx *ctx; |
|
|
|
struct rspamd_rcl_struct_parser *pd = ud; |
|
|
|
|
|
|
|
ctx = pd->user_struct; |
|
|
|
|
|
|
|
if (ucl_object_type(obj) != UCL_STRING) { |
|
|
|
g_set_error(err, rspamd_proxy_quark(), 100, |
|
|
|
"log_tag_type option must be a string"); |
|
|
|
return FALSE; |
|
|
|
} |
|
|
|
|
|
|
|
ctx->log_tag_type = rspamd_proxy_parse_log_tag_type(ucl_object_tostring(obj)); |
|
|
|
|
|
|
|
return TRUE; |
|
|
|
} |
|
|
|
|
|
|
|
gpointer |
|
|
|
init_rspamd_proxy(struct rspamd_config *cfg) |
|
|
|
{ |
|
|
|
@ -810,6 +926,7 @@ init_rspamd_proxy(struct rspamd_config *cfg) |
|
|
|
(rspamd_mempool_destruct_t) rspamd_array_free_hard, ctx->cmp_refs); |
|
|
|
ctx->max_retries = DEFAULT_RETRIES; |
|
|
|
ctx->spam_header = RSPAMD_MILTER_SPAM_HEADER; |
|
|
|
ctx->log_tag_type = RSPAMD_PROXY_LOG_TAG_SESSION; /* Default to session tag */ |
|
|
|
|
|
|
|
rspamd_rcl_register_worker_option(cfg, |
|
|
|
type, |
|
|
|
@ -933,6 +1050,16 @@ init_rspamd_proxy(struct rspamd_config *cfg) |
|
|
|
0, |
|
|
|
"Use custom tempfail message"); |
|
|
|
|
|
|
|
/* We need a custom parser for log_tag_type as it's an enum */ |
|
|
|
rspamd_rcl_register_worker_option(cfg, |
|
|
|
type, |
|
|
|
"log_tag_type", |
|
|
|
rspamd_proxy_parse_log_tag_worker_option, |
|
|
|
ctx, |
|
|
|
0, |
|
|
|
0, |
|
|
|
"Log tag type: session (default), queue_id, or none"); |
|
|
|
|
|
|
|
return ctx; |
|
|
|
} |
|
|
|
|
|
|
|
@ -1587,6 +1714,9 @@ proxy_open_mirror_connections(struct rspamd_proxy_session *session) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
/* Add log tag header based on mirror's configuration */ |
|
|
|
rspamd_proxy_add_log_tag_header(msg, session, m->log_tag_type); |
|
|
|
|
|
|
|
/* Set handlers for the connection */ |
|
|
|
conn->error_handler = proxy_backend_mirror_error_handler; |
|
|
|
conn->finish_handler = proxy_backend_mirror_finish_handler; |
|
|
|
@ -1723,6 +1853,9 @@ proxy_open_mirror_connections(struct rspamd_proxy_session *session) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
/* Add log tag header based on mirror's configuration */ |
|
|
|
rspamd_proxy_add_log_tag_header(msg, session, m->log_tag_type); |
|
|
|
|
|
|
|
unsigned int http_opts = RSPAMD_HTTP_CLIENT_SIMPLE; |
|
|
|
|
|
|
|
if (m->ssl) { |
|
|
|
@ -2416,6 +2549,9 @@ proxy_send_master_message(struct rspamd_proxy_session *session) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
/* Add log tag header based on backend's configuration */ |
|
|
|
rspamd_proxy_add_log_tag_header(msg, session, backend->log_tag_type); |
|
|
|
|
|
|
|
if (backend->local || |
|
|
|
rspamd_inet_address_is_local( |
|
|
|
rspamd_upstream_addr_cur( |
|
|
|
@ -2536,8 +2672,9 @@ proxy_client_finish_handler(struct rspamd_http_connection *conn, |
|
|
|
rspamd_http_message_remove_header(msg, "Keep-Alive"); |
|
|
|
rspamd_http_message_remove_header(msg, "Connection"); |
|
|
|
rspamd_http_message_remove_header(msg, "Key"); |
|
|
|
rspamd_http_message_add_header_len(msg, LOG_TAG_HEADER, session->pool->tag.uid, |
|
|
|
strnlen(session->pool->tag.uid, sizeof(session->pool->tag.uid))); |
|
|
|
|
|
|
|
/* Add log tag header based on worker's default configuration */ |
|
|
|
rspamd_proxy_add_log_tag_header(msg, session, session->ctx->log_tag_type); |
|
|
|
|
|
|
|
proxy_open_mirror_connections(session); |
|
|
|
rspamd_http_connection_reset(session->client_conn); |
|
|
|
|