Browse Source

[Project] Implement more flexible http timeouts

Issue: #5611
pull/5614/head
Vsevolod Stakhov 4 weeks ago
parent
commit
642a8addb3
No known key found for this signature in database GPG Key ID: 7647B6790081437
  1. 126
      src/libserver/http/http_connection.c
  2. 28
      src/libserver/http/http_connection.h
  3. 117
      src/libserver/http/http_context.c
  4. 11
      src/libserver/http/http_context.h
  5. 50
      src/lua/lua_http.c

126
src/libserver/http/http_connection.c

@ -69,13 +69,27 @@ struct rspamd_http_connection_private {
struct http_parser parser;
struct http_parser_settings parser_cb;
struct rspamd_io_ev ev;
ev_tstamp timeout;
ev_tstamp timeout; /* legacy/global timeout (fallback) */
/* Staged timeouts (seconds); 0 means use ctx/defaults */
ev_tstamp connect_timeout;
ev_tstamp ssl_timeout;
ev_tstamp write_timeout;
ev_tstamp read_timeout;
struct rspamd_http_message *msg;
struct iovec *out;
unsigned int outlen;
enum rspamd_http_priv_flags flags;
gsize wr_pos;
gsize wr_total;
/* Keepalive tuning and telemetry */
double created_ts; /* when connection object was created */
double last_used_ts; /* last time returned to pool */
unsigned int reuse_count; /* number of reuses from keepalive */
double ka_ttl_override; /* per-request TTL */
double ka_idle_override; /* per-request idle timeout */
unsigned int ka_max_reuse_override; /* per-request max reuse */
/* Internal state */
bool first_write_done;
};
static const rspamd_ftok_t key_header = {
@ -731,11 +745,11 @@ rspamd_http_simple_client_helper(struct rspamd_http_connection *conn)
if (conn->opts & RSPAMD_HTTP_CLIENT_SHARED) {
rspamd_http_connection_read_message_shared(conn, conn->ud,
conn->priv->timeout);
(priv->read_timeout > 0 ? priv->read_timeout : conn->priv->timeout));
}
else {
rspamd_http_connection_read_message(conn, conn->ud,
conn->priv->timeout);
(priv->read_timeout > 0 ? priv->read_timeout : conn->priv->timeout));
}
if (priv->msg) {
@ -853,6 +867,8 @@ call_finish_handler:
}
else {
/* Plan read message */
/* Switch to read stage timeout */
priv->first_write_done = true;
rspamd_http_simple_client_helper(conn);
}
}
@ -1130,6 +1146,18 @@ rspamd_http_connection_new_common(struct rspamd_http_context *ctx,
priv = g_malloc0(sizeof(struct rspamd_http_connection_private));
conn->priv = priv;
priv->ctx = ctx;
/* Initialize staged timeouts and keepalive telemetry */
priv->connect_timeout = ctx->config.connect_timeout;
priv->ssl_timeout = ctx->config.ssl_timeout;
priv->write_timeout = ctx->config.write_timeout;
priv->read_timeout = ctx->config.read_timeout;
priv->created_ts = rspamd_get_ticks(FALSE);
priv->last_used_ts = 0;
priv->reuse_count = 0;
priv->ka_ttl_override = 0;
priv->ka_idle_override = 0;
priv->ka_max_reuse_override = 0;
priv->first_write_done = false;
priv->flags = priv_flags;
if (type == RSPAMD_HTTP_SERVER) {
@ -1513,7 +1541,8 @@ rspamd_http_connection_read_message_common(struct rspamd_http_connection *conn,
priv->flags |= RSPAMD_HTTP_CONN_FLAG_ENCRYPTED;
}
priv->timeout = timeout;
/* Use read-stage timeout override if set; else fallback */
priv->timeout = (priv->read_timeout > 0 ? priv->read_timeout : timeout);
priv->header = NULL;
priv->buf = g_malloc0(sizeof(*priv->buf));
REF_INIT_RETAIN(priv->buf, rspamd_http_privbuf_dtor);
@ -2045,7 +2074,8 @@ rspamd_http_connection_write_message_common(struct rspamd_http_connection *conn,
conn->ud = ud;
priv->msg = msg;
priv->timeout = timeout;
/* Use write-stage timeout override if set */
priv->timeout = (priv->write_timeout > 0 ? priv->write_timeout : timeout);
priv->header = NULL;
priv->buf = g_malloc0(sizeof(*priv->buf));
@ -2387,8 +2417,10 @@ if (conn->opts & RSPAMD_HTTP_CLIENT_SSL) {
conn->log_tag);
g_assert(priv->ssl != NULL);
/* Use ssl_timeout for handshake if provided */
ev_tstamp ssl_to = (priv->ssl_timeout > 0 ? priv->ssl_timeout : (priv->connect_timeout > 0 ? priv->connect_timeout : priv->timeout));
if (!rspamd_ssl_connect_fd(priv->ssl, conn->fd, host, &priv->ev,
priv->timeout, rspamd_http_event_handler,
ssl_to, rspamd_http_event_handler,
rspamd_http_ssl_err_handler, conn)) {
err = g_error_new(HTTP_ERROR, 400,
@ -2415,7 +2447,9 @@ if (conn->opts & RSPAMD_HTTP_CLIENT_SSL) {
else {
rspamd_ev_watcher_init(&priv->ev, conn->fd, EV_WRITE,
rspamd_http_event_handler, conn);
rspamd_ev_watcher_start(priv->ctx->event_loop, &priv->ev, priv->timeout);
/* Use connect_timeout on initial EV_WRITE stage if provided */
ev_tstamp start_to = (priv->connect_timeout > 0 ? priv->connect_timeout : priv->timeout);
rspamd_ev_watcher_start(priv->ctx->event_loop, &priv->ev, start_to);
}
return TRUE;
@ -2654,3 +2688,81 @@ void rspamd_http_connection_disable_encryption(struct rspamd_http_connection *co
priv->flags &= ~RSPAMD_HTTP_CONN_FLAG_ENCRYPTED;
}
}
void rspamd_http_connection_set_timeouts(struct rspamd_http_connection *conn,
ev_tstamp connect_timeout,
ev_tstamp ssl_timeout,
ev_tstamp write_timeout,
ev_tstamp read_timeout)
{
struct rspamd_http_connection_private *priv = conn->priv;
if (connect_timeout > 0) {
priv->connect_timeout = connect_timeout;
}
if (ssl_timeout > 0) {
priv->ssl_timeout = ssl_timeout;
}
if (write_timeout > 0) {
priv->write_timeout = write_timeout;
}
if (read_timeout > 0) {
priv->read_timeout = read_timeout;
}
}
void rspamd_http_connection_set_keepalive_tuning(struct rspamd_http_connection *conn,
double connection_ttl,
double idle_timeout,
unsigned int max_reuse)
{
struct rspamd_http_connection_private *priv = conn->priv;
if (connection_ttl > 0) {
priv->ka_ttl_override = connection_ttl;
}
if (idle_timeout > 0) {
priv->ka_idle_override = idle_timeout;
}
if (max_reuse > 0) {
priv->ka_max_reuse_override = max_reuse;
}
}
void rspamd_http_connection_keepalive_note_put(struct rspamd_http_connection *conn,
double now_ts)
{
struct rspamd_http_connection_private *priv = conn->priv;
priv->last_used_ts = now_ts;
}
void rspamd_http_connection_keepalive_note_reuse(struct rspamd_http_connection *conn)
{
struct rspamd_http_connection_private *priv = conn->priv;
priv->reuse_count++;
}
gboolean rspamd_http_connection_keepalive_is_valid(struct rspamd_http_connection *conn,
double now_ts,
double default_ttl,
unsigned int default_max_reuse)
{
struct rspamd_http_connection_private *priv = conn->priv;
double ttl = (priv->ka_ttl_override > 0 ? priv->ka_ttl_override : default_ttl);
unsigned int max_reuse = (priv->ka_max_reuse_override > 0 ? priv->ka_max_reuse_override : default_max_reuse);
if (ttl > 0 && rspamd_get_ticks(FALSE) - priv->created_ts > ttl) {
return FALSE;
}
if (max_reuse > 0 && priv->reuse_count >= max_reuse) {
return FALSE;
}
return TRUE;
}
double rspamd_http_connection_keepalive_idle_timeout(struct rspamd_http_connection *conn,
double default_idle)
{
struct rspamd_http_connection_private *priv = conn->priv;
return (priv->ka_idle_override > 0 ? priv->ka_idle_override : default_idle);
}

28
src/libserver/http/http_connection.h

@ -186,6 +186,34 @@ struct rspamd_http_connection *rspamd_http_connection_new_client(
unsigned opts,
rspamd_inet_addr_t *addr);
/**
* Set per-request staged timeouts. Pass 0 to keep defaults.
*/
void rspamd_http_connection_set_timeouts(struct rspamd_http_connection *conn,
ev_tstamp connect_timeout,
ev_tstamp ssl_timeout,
ev_tstamp write_timeout,
ev_tstamp read_timeout);
/**
* Set per-request keepalive tuning. Pass 0 to keep defaults/disable limit.
*/
void rspamd_http_connection_set_keepalive_tuning(struct rspamd_http_connection *conn,
double connection_ttl,
double idle_timeout,
unsigned int max_reuse);
/* Helpers used by keepalive context */
void rspamd_http_connection_keepalive_note_put(struct rspamd_http_connection *conn,
double now_ts);
void rspamd_http_connection_keepalive_note_reuse(struct rspamd_http_connection *conn);
gboolean rspamd_http_connection_keepalive_is_valid(struct rspamd_http_connection *conn,
double now_ts,
double default_ttl,
unsigned int default_max_reuse);
double rspamd_http_connection_keepalive_idle_timeout(struct rspamd_http_connection *conn,
double default_idle);
/**
* Creates an ordinary client connection using ready file descriptor (ignores proxy)
* @param ctx

117
src/libserver/http/http_context.c

@ -101,6 +101,16 @@ rspamd_http_context_new_default(struct rspamd_config *cfg,
ctx->config.user_agent = default_user_agent;
ctx->config.keepalive_interval = default_keepalive_interval;
ctx->config.server_hdr = default_server_hdr;
/* New defaults (disabled -> 0 to preserve legacy single-timeout behavior) */
ctx->config.connect_timeout = 0.0;
ctx->config.ssl_timeout = 0.0;
ctx->config.write_timeout = 0.0;
ctx->config.read_timeout = 0.0;
ctx->config.keepalive_pool_size = 0;
ctx->config.keepalive_connection_ttl = 0.0;
ctx->config.keepalive_idle_timeout = 0.0; /* fall back to keepalive_interval */
ctx->config.keepalive_max_reuse = 0; /* unlimited */
ctx->config.keepalive_eviction_policy = 1; /* LRU */
ctx->ups_ctx = ups_ctx;
if (cfg) {
@ -270,6 +280,58 @@ rspamd_http_context_create(struct rspamd_config *cfg,
if (http_proxy) {
ctx->config.http_proxy = ucl_object_tostring(http_proxy);
}
/* New staged timeouts */
const ucl_object_t *connect_timeout = ucl_object_lookup(client_obj, "connect_timeout");
if (connect_timeout) {
ctx->config.connect_timeout = ucl_object_todouble(connect_timeout);
}
const ucl_object_t *ssl_timeout = ucl_object_lookup(client_obj, "ssl_timeout");
if (ssl_timeout) {
ctx->config.ssl_timeout = ucl_object_todouble(ssl_timeout);
}
const ucl_object_t *write_timeout = ucl_object_lookup(client_obj, "write_timeout");
if (write_timeout) {
ctx->config.write_timeout = ucl_object_todouble(write_timeout);
}
const ucl_object_t *read_timeout = ucl_object_lookup(client_obj, "read_timeout");
if (read_timeout) {
ctx->config.read_timeout = ucl_object_todouble(read_timeout);
}
/* Keepalive/pooling */
const ucl_object_t *ka_pool_size = ucl_object_lookup(client_obj, "pool_size");
if (ka_pool_size) {
ctx->config.keepalive_pool_size = ucl_object_toint(ka_pool_size);
}
const ucl_object_t *ka_ttl = ucl_object_lookup(client_obj, "connection_ttl");
if (ka_ttl) {
ctx->config.keepalive_connection_ttl = ucl_object_todouble(ka_ttl);
}
const ucl_object_t *ka_idle = ucl_object_lookup(client_obj, "idle_timeout");
if (ka_idle) {
ctx->config.keepalive_idle_timeout = ucl_object_todouble(ka_idle);
}
const ucl_object_t *ka_reuse = ucl_object_lookup(client_obj, "max_reuse");
if (ka_reuse) {
ctx->config.keepalive_max_reuse = ucl_object_toint(ka_reuse);
}
const ucl_object_t *ka_evict = ucl_object_lookup(client_obj, "eviction_policy");
if (ka_evict) {
/* map string to int policy if string provided */
if (ucl_object_type(ka_evict) == UCL_STRING) {
const char *pol = ucl_object_tostring(ka_evict);
if (g_ascii_strcasecmp(pol, "lifo") == 0) {
ctx->config.keepalive_eviction_policy = 0;
}
else {
ctx->config.keepalive_eviction_policy = 1;
}
}
else {
ctx->config.keepalive_eviction_policy = ucl_object_toint(ka_evict);
}
}
}
server_obj = ucl_object_lookup(http_obj, "server");
@ -429,7 +491,16 @@ rspamd_http_context_check_keepalive(struct rspamd_http_context *ctx,
int err;
socklen_t len = sizeof(int);
cbd = g_queue_pop_head(conns);
if (ctx->config.keepalive_eviction_policy == 1) {
/* LRU: reuse the tail (oldest) */
GList *tail = g_queue_peek_tail_link(conns);
cbd = (struct rspamd_http_keepalive_cbdata *) tail->data;
g_queue_delete_link(conns, tail);
}
else {
/* LIFO: reuse the head (most recent) */
cbd = g_queue_pop_head(conns);
}
rspamd_ev_watcher_stop(ctx->event_loop, &cbd->ev);
conn = cbd->conn;
g_free(cbd);
@ -453,6 +524,22 @@ rspamd_http_context_check_keepalive(struct rspamd_http_context *ctx,
return NULL;
}
/* Enforce ttl/reuse limits */
double now_ts = ev_now(ctx->event_loop);
if (!rspamd_http_connection_keepalive_is_valid(conn, now_ts,
ctx->config.keepalive_connection_ttl,
ctx->config.keepalive_max_reuse)) {
msg_debug_http_context("evict expired keepalive element %s (%s, ssl=%d)",
rspamd_inet_address_to_string_pretty(phk->addr),
phk->host,
(int) phk->is_ssl);
rspamd_http_connection_unref(conn);
return NULL;
}
/* Track reuse */
rspamd_http_connection_keepalive_note_reuse(conn);
msg_debug_http_context("reused keepalive element %s (%s, ssl=%d), %d connections queued",
rspamd_inet_address_to_string_pretty(phk->addr),
phk->host,
@ -656,14 +743,36 @@ void rspamd_http_context_push_keepalive(struct rspamd_http_context *ctx,
cbdata->ctx = ctx;
conn->finished = FALSE;
/* Enforce pool size (evict tail if exceeded) */
if (ctx->config.keepalive_pool_size > 0) {
while ((unsigned) cbdata->queue->length > ctx->config.keepalive_pool_size) {
GList *last = g_queue_peek_tail_link(cbdata->queue);
if (last) {
struct rspamd_http_keepalive_cbdata *to_evict = (struct rspamd_http_keepalive_cbdata *) last->data;
g_queue_delete_link(cbdata->queue, last);
rspamd_ev_watcher_stop(cbdata->ctx->event_loop, &to_evict->ev);
rspamd_http_connection_unref(to_evict->conn);
g_free(to_evict);
}
else {
break;
}
}
}
/* Note time of putting into pool */
rspamd_http_connection_keepalive_note_put(conn, ev_now(event_loop));
rspamd_ev_watcher_init(&cbdata->ev, conn->fd, EV_READ,
rspamd_http_keepalive_handler,
cbdata);
rspamd_ev_watcher_start(event_loop, &cbdata->ev, timeout);
/* Idle timeout override if provided */
double idle_to = rspamd_http_connection_keepalive_idle_timeout(conn, ctx->config.keepalive_idle_timeout > 0 ? ctx->config.keepalive_idle_timeout : timeout);
rspamd_ev_watcher_start(event_loop, &cbdata->ev, idle_to);
msg_debug_http_context("push keepalive element %s (%s), %d connections queued, %.1f timeout",
rspamd_inet_address_to_string_pretty(cbdata->conn->keepalive_hash_key->addr),
cbdata->conn->keepalive_hash_key->host,
cbdata->queue->length,
timeout);
}
idle_to);
}

11
src/libserver/http/http_context.h

@ -41,6 +41,17 @@ struct rspamd_http_context_cfg {
const char *user_agent;
const char *http_proxy;
const char *server_hdr;
/* Client-side staged timeouts (seconds) */
double connect_timeout; /* TCP connect */
double ssl_timeout; /* SSL handshake */
double write_timeout; /* Request write */
double read_timeout; /* Response read */
/* Keep-alive/pool tuning */
unsigned int keepalive_pool_size; /* max conns per key */
double keepalive_connection_ttl; /* absolute TTL */
double keepalive_idle_timeout; /* idle timeout */
unsigned int keepalive_max_reuse; /* reuse limit */
int keepalive_eviction_policy; /* 0=LIFO,1=LRU */
};
/**

50
src/lua/lua_http.c

@ -506,7 +506,7 @@ lua_http_resume_handler(struct rspamd_http_connection *conn,
}
static gboolean
lua_http_make_connection(struct lua_http_cbdata *cbd)
lua_http_make_connection(lua_State *L, struct lua_http_cbdata *cbd)
{
rspamd_inet_address_set_port(cbd->addr, cbd->msg->port);
unsigned http_opts = RSPAMD_HTTP_CLIENT_SIMPLE;
@ -574,6 +574,43 @@ lua_http_make_connection(struct lua_http_cbdata *cbd)
cbd->flags |= RSPAMD_LUA_HTTP_FLAG_RESOLVED;
}
/* Optional per-request tuning from table (if present) */
if (lua_type(L, 1) == LUA_TTABLE) {
double connect_timeout = 0, ssl_timeout = 0, write_timeout = 0, read_timeout = 0;
double connection_ttl = 0, idle_timeout = 0;
unsigned int max_reuse = 0;
lua_pushstring(L, "connect_timeout");
lua_gettable(L, 1);
if (lua_type(L, -1) == LUA_TNUMBER) connect_timeout = lua_tonumber(L, -1);
lua_pop(L, 1);
lua_pushstring(L, "ssl_timeout");
lua_gettable(L, 1);
if (lua_type(L, -1) == LUA_TNUMBER) ssl_timeout = lua_tonumber(L, -1);
lua_pop(L, 1);
lua_pushstring(L, "write_timeout");
lua_gettable(L, 1);
if (lua_type(L, -1) == LUA_TNUMBER) write_timeout = lua_tonumber(L, -1);
lua_pop(L, 1);
lua_pushstring(L, "read_timeout");
lua_gettable(L, 1);
if (lua_type(L, -1) == LUA_TNUMBER) read_timeout = lua_tonumber(L, -1);
lua_pop(L, 1);
rspamd_http_connection_set_timeouts(cbd->conn, connect_timeout, ssl_timeout, write_timeout, read_timeout);
lua_pushstring(L, "connection_ttl");
lua_gettable(L, 1);
if (lua_type(L, -1) == LUA_TNUMBER) connection_ttl = lua_tonumber(L, -1);
lua_pop(L, 1);
lua_pushstring(L, "idle_timeout");
lua_gettable(L, 1);
if (lua_type(L, -1) == LUA_TNUMBER) idle_timeout = lua_tonumber(L, -1);
lua_pop(L, 1);
lua_pushstring(L, "max_reuse");
lua_gettable(L, 1);
if (lua_type(L, -1) == LUA_TNUMBER) max_reuse = lua_tointeger(L, -1);
lua_pop(L, 1);
rspamd_http_connection_set_keepalive_tuning(cbd->conn, connection_ttl, idle_timeout, max_reuse);
}
if (cbd->task) {
cbd->conn->log_tag = cbd->task->task_pool->tag.uid;
@ -635,7 +672,7 @@ lua_http_dns_handler(struct rdns_reply *reply, gpointer ud)
}
else {
REF_RETAIN(cbd);
if (!lua_http_make_connection(cbd)) {
if (!lua_http_make_connection(NULL, cbd)) {
lua_http_push_error(cbd, "unable to make connection to the host");
if (cbd->ref.refcount > 1) {
@ -713,6 +750,13 @@ lua_http_push_headers(lua_State *L, struct rspamd_http_message *msg)
* @param {boolean} keepalive enable keep-alive pool
* @param {string} user for HTTP authentication
* @param {string} password for HTTP authentication, only if "user" present
* @param {number} connect_timeout optional TCP connect timeout (seconds)
* @param {number} ssl_timeout optional SSL handshake timeout (seconds)
* @param {number} write_timeout optional request write timeout (seconds)
* @param {number} read_timeout optional response read timeout (seconds)
* @param {number} connection_ttl optional absolute keep-alive connection TTL (seconds)
* @param {number} idle_timeout optional keep-alive idle timeout override (seconds)
* @param {number} max_reuse optional keep-alive max reuse count per connection
* @return {boolean} `true`, in **async** mode, if a request has been successfully scheduled. If this value is `false` then some error occurred, the callback thus will not be called.
* @return In **sync** mode `string|nil, nil|table` In sync mode error message if any and response as table: `int` _code_, `string` _content_ and `table` _headers_ (header -> value)
*/
@ -1276,7 +1320,7 @@ lua_http_request(lua_State *L)
gboolean ret;
REF_RETAIN(cbd);
ret = lua_http_make_connection(cbd);
ret = lua_http_make_connection(L, cbd);
if (!ret) {
if (cbd->up) {

Loading…
Cancel
Save