Browse Source

[Project] Make it compileable again...

libev-migration
Vsevolod Stakhov 6 years ago
parent
commit
dc96f9b37f
  1. 7
      CMakeLists.txt
  2. 2
      contrib/libev/CMakeLists.txt
  3. 25
      src/client/rspamc.c
  4. 32
      src/hs_helper.c
  5. 4
      src/libutil/upstream.c
  6. 11
      src/lua/lua_config.c
  7. 5
      src/rspamadm/control.c
  8. 23
      src/rspamadm/lua_repl.c
  9. 15
      src/rspamadm/rspamadm.c
  10. 86
      src/rspamd_proxy.c

7
CMakeLists.txt

@ -864,11 +864,6 @@ CHECK_INCLUDE_FILES(readpassphrase.h HAVE_READPASSPHRASE_H)
CHECK_INCLUDE_FILES(termios.h HAVE_TERMIOS_H)
CHECK_INCLUDE_FILES(paths.h HAVE_PATHS_H)
CHECK_INCLUDE_FILES(ctype.h HAVE_CTYPE_H)
CHECK_INCLUDE_FILES(sys/sendfile.h HAVE_SYS_SENDFILE_H)
CHECK_INCLUDE_FILES(linux/falloc.h HAVE_LINUX_FALLOC_H)
CHECK_INCLUDE_FILES(sys/eventfd.h HAVE_SYS_EVENTFD_H)
CHECK_INCLUDE_FILES(aio.h HAVE_AIO_H)
CHECK_INCLUDE_FILES(libaio.h HAVE_LIBAIO_H)
CHECK_INCLUDE_FILES(unistd.h HAVE_UNISTD_H)
CHECK_INCLUDE_FILES(cpuid.h HAVE_CPUID_H)
CHECK_INCLUDE_FILES(dirent.h HAVE_DIRENT_H)
@ -1185,7 +1180,6 @@ LIST(APPEND RSPAMD_REQUIRED_LIBRARIES "${LUA_LIBRARY}")
LIST(APPEND RSPAMD_REQUIRED_LIBRARIES ucl)
LIST(APPEND RSPAMD_REQUIRED_LIBRARIES rdns)
LIST(APPEND RSPAMD_REQUIRED_LIBRARIES ottery)
LIST(APPEND RSPAMD_REQUIRED_LIBRARIES event)
LIST(APPEND RSPAMD_REQUIRED_LIBRARIES xxhash)
IF(GLIB_COMPAT)
@ -1219,6 +1213,7 @@ LIST(APPEND RSPAMD_REQUIRED_LIBRARIES rspamd-hiredis)
LIST(APPEND RSPAMD_REQUIRED_LIBRARIES rspamd-actrie)
LIST(APPEND RSPAMD_REQUIRED_LIBRARIES rspamd-t1ha)
LIST(APPEND RSPAMD_REQUIRED_LIBRARIES rspamd-ev)
IF(ENABLE_CLANG_PLUGIN MATCHES "ON")
ADD_SUBDIRECTORY(clang-plugin)

2
contrib/libev/CMakeLists.txt

@ -55,7 +55,7 @@ ENDIF()
CONFIGURE_FILE(config.h.in libev-config.h)
ADD_LIBRARY(rspamd-libev STATIC ${LIBEVSRC})
ADD_LIBRARY(rspamd-ev STATIC ${LIBEVSRC})
ADD_DEFINITIONS("-DEV_CONFIG_H=\"${CMAKE_CURRENT_BINARY_DIR}/libev-config.h\""
-DEV_MULTIPLICITY=1
-DEV_USE_FLOOR=1

25
src/client/rspamc.c

@ -1829,7 +1829,7 @@ rspamc_process_dir (struct ev_loop *ev_base, struct rspamc_command *cmd,
if (cur_req >= max_requests) {
cur_req = 0;
/* Wait for completion */
event_base_loop (ev_base, 0);
ev_loop (ev_base, 0);
}
}
}
@ -1840,7 +1840,7 @@ rspamc_process_dir (struct ev_loop *ev_base, struct rspamc_command *cmd,
}
closedir (d);
event_base_loop (ev_base, 0);
ev_loop (ev_base, 0);
}
@ -1863,7 +1863,7 @@ main (gint argc, gchar **argv, gchar **env)
GPid cld;
struct rspamc_command *cmd;
FILE *in = NULL;
struct ev_loop *ev_base;
struct ev_loop *event_loop;
struct stat st;
struct sigaction sigpipe_act;
gchar **exclude_pattern;
@ -1884,6 +1884,7 @@ main (gint argc, gchar **argv, gchar **env)
npatterns = 0;
while (exclude_pattern && *exclude_pattern) {
exclude_pattern ++;
npatterns ++;
}
@ -1902,7 +1903,7 @@ main (gint argc, gchar **argv, gchar **env)
}
rspamd_init_libs ();
ev_base = event_base_new ();
event_loop = ev_default_loop (EVFLAG_SIGNALFD);
struct rspamd_http_context_cfg http_config;
@ -1911,7 +1912,7 @@ main (gint argc, gchar **argv, gchar **env)
http_config.kp_cache_size_server = 0;
http_config.user_agent = user_agent;
http_ctx = rspamd_http_context_create_config (&http_config,
ev_base, NULL);
event_loop, NULL);
/* Ignore sigpipe */
sigemptyset (&sigpipe_act.sa_mask);
@ -1972,10 +1973,10 @@ main (gint argc, gchar **argv, gchar **env)
if (start_argc == argc) {
/* Do command without input or with stdin */
if (empty_input) {
rspamc_process_input (ev_base, cmd, NULL, "empty", kwattrs);
rspamc_process_input (event_loop, cmd, NULL, "empty", kwattrs);
}
else {
rspamc_process_input (ev_base, cmd, in, "stdin", kwattrs);
rspamc_process_input (event_loop, cmd, in, "stdin", kwattrs);
}
}
else {
@ -1990,7 +1991,7 @@ main (gint argc, gchar **argv, gchar **env)
}
if (S_ISDIR (st.st_mode)) {
/* Directories are processed with a separate limit */
rspamc_process_dir (ev_base, cmd, argv[i], kwattrs);
rspamc_process_dir (event_loop, cmd, argv[i], kwattrs);
cur_req = 0;
}
else {
@ -1999,24 +2000,24 @@ main (gint argc, gchar **argv, gchar **env)
fprintf (stderr, "cannot open file %s\n", argv[i]);
exit (EXIT_FAILURE);
}
rspamc_process_input (ev_base, cmd, in, argv[i], kwattrs);
rspamc_process_input (event_loop, cmd, in, argv[i], kwattrs);
cur_req++;
fclose (in);
}
if (cur_req >= max_requests) {
cur_req = 0;
/* Wait for completion */
event_base_loop (ev_base, 0);
ev_loop (event_loop, 0);
}
}
}
if (cmd->cmd == RSPAMC_COMMAND_FUZZY_DELHASH) {
rspamc_process_input (ev_base, cmd, NULL, "hashes", kwattrs);
rspamc_process_input (event_loop, cmd, NULL, "hashes", kwattrs);
}
}
event_base_loop (ev_base, 0);
ev_loop (event_loop, 0);
g_queue_free_full (kwattrs, rspamc_kwattr_free);

32
src/hs_helper.c

@ -47,7 +47,7 @@ static const guint64 rspamd_hs_helper_magic = 0x22d310157a2288a0ULL;
struct hs_helper_ctx {
guint64 magic;
/* Events base */
struct ev_loop *ev_base;
struct ev_loop *event_loop;
/* DNS resolver */
struct rspamd_dns_resolver *resolver;
/* Config */
@ -57,7 +57,7 @@ struct hs_helper_ctx {
gboolean loaded;
gdouble max_time;
gdouble recompile_time;
struct event recompile_timer;
ev_timer recompile_timer;
};
static gpointer
@ -216,7 +216,7 @@ rspamd_rs_compile (struct hs_helper_ctx *ctx, struct rspamd_worker *worker,
* XXX: now we just sleep for 5 seconds to ensure that
*/
if (!ctx->loaded) {
sleep (5);
ev_sleep (5.0);
ctx->loaded = TRUE;
}
@ -226,7 +226,7 @@ rspamd_rs_compile (struct hs_helper_ctx *ctx, struct rspamd_worker *worker,
sizeof (srv_cmd.cmd.hs_loaded.cache_dir));
srv_cmd.cmd.hs_loaded.forced = forced;
rspamd_srv_send_command (worker, ctx->ev_base, &srv_cmd, -1, NULL, NULL);
rspamd_srv_send_command (worker, ctx->event_loop, &srv_cmd, -1, NULL, NULL);
return TRUE;
}
@ -258,26 +258,23 @@ rspamd_hs_helper_reload (struct rspamd_main *rspamd_main,
}
static void
rspamd_hs_helper_timer (gint fd, short what, gpointer ud)
rspamd_hs_helper_timer (EV_P_ ev_timer *w, int revents)
{
struct rspamd_worker *worker = ud;
struct rspamd_worker *worker = (struct rspamd_worker *)w->data;
struct hs_helper_ctx *ctx;
struct timeval tv;
double tim;
ctx = worker->ctx;
tim = rspamd_time_jitter (ctx->recompile_time, 0);
double_to_tv (tim, &tv);
event_del (&ctx->recompile_timer);
w->repeat = tim;
rspamd_rs_compile (ctx, worker, FALSE);
event_add (&ctx->recompile_timer, &tv);
ev_timer_again (EV_A_ w);
}
static void
start_hs_helper (struct rspamd_worker *worker)
{
struct hs_helper_ctx *ctx = worker->ctx;
struct timeval tv;
double tim;
ctx->cfg = worker->srv->cfg;
@ -289,7 +286,7 @@ start_hs_helper (struct rspamd_worker *worker)
ctx->hs_dir = RSPAMD_DBDIR "/";
}
ctx->ev_base = rspamd_prepare_worker (worker,
ctx->event_loop = rspamd_prepare_worker (worker,
"hs_helper",
NULL);
@ -301,13 +298,12 @@ start_hs_helper (struct rspamd_worker *worker)
rspamd_control_worker_add_cmd_handler (worker, RSPAMD_CONTROL_RECOMPILE,
rspamd_hs_helper_reload, ctx);
event_set (&ctx->recompile_timer, -1, EV_TIMEOUT, rspamd_hs_helper_timer,
worker);
event_base_set (ctx->ev_base, &ctx->recompile_timer);
ctx->recompile_timer.data = worker;
tim = rspamd_time_jitter (ctx->recompile_time, 0);
double_to_tv (tim, &tv);
event_add (&ctx->recompile_timer, &tv);
event_base_loop (ctx->ev_base, 0);
ev_timer_init (&ctx->recompile_timer, rspamd_hs_helper_timer, tim, 0.0);
ev_timer_start (ctx->event_loop, &ctx->recompile_timer);
ev_loop (ctx->event_loop, 0);
rspamd_worker_block_signals ();
rspamd_log_close (worker->srv->logger, TRUE);

4
src/libutil/upstream.c

@ -913,9 +913,7 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
/* Here the upstreams list is already locked */
RSPAMD_UPSTREAM_LOCK (up->lock);
if (rspamd_event_pending (&up->ev, EV_TIMEOUT)) {
event_del (&up->ev);
}
ev_timer_stop (up->ctx->event_loop, &up->ev);
g_ptr_array_add (ups->alive, up);
up->active_idx = ups->alive->len - 1;
RSPAMD_UPSTREAM_UNLOCK (up->lock);

11
src/lua/lua_config.c

@ -3050,7 +3050,7 @@ static void lua_periodic_callback_finish (struct thread_entry *thread, int ret);
static void lua_periodic_callback_error (struct thread_entry *thread, int ret, const char *msg);
struct rspamd_lua_periodic {
struct ev_loop *ev_base;
struct ev_loop *event_loop;
struct rspamd_config *cfg;
lua_State *L;
gdouble timeout;
@ -3082,7 +3082,7 @@ lua_periodic_callback (struct ev_loop *loop, ev_timer *w, int revents)
*pcfg = cfg;
pev_base = lua_newuserdata (L, sizeof (*pev_base));
rspamd_lua_setclass (L, "rspamd{ev_base}", -1);
*pev_base = periodic->ev_base;
*pev_base = periodic->event_loop;
lua_thread_call (thread, 2);
}
@ -3097,6 +3097,7 @@ lua_periodic_callback_finish (struct thread_entry *thread, int ret)
L = thread->lua_state;
ev_now_update (periodic->event_loop);
#ifdef HAVE_EVENT_NO_CACHE_TIME_FUNC
event_base_update_cache_time (periodic->ev_base);
#endif
@ -3119,11 +3120,11 @@ lua_periodic_callback_finish (struct thread_entry *thread, int ret)
}
periodic->ev.repeat = timeout;
ev_timer_again (periodic->ev_base, &periodic->ev);
ev_timer_again (periodic->event_loop, &periodic->ev);
}
else {
luaL_unref (L, LUA_REGISTRYINDEX, periodic->cbref);
ev_timer_stop (periodic->ev_base, &periodic->ev);
ev_timer_stop (periodic->event_loop, &periodic->ev);
g_free (periodic);
}
}
@ -3163,7 +3164,7 @@ lua_config_add_periodic (lua_State *L)
periodic->timeout = timeout;
periodic->L = L;
periodic->cfg = cfg;
periodic->ev_base = ev_base;
periodic->event_loop = ev_base;
periodic->need_jitter = need_jitter;
lua_pushvalue (L, 4);
periodic->cbref = luaL_ref (L, LUA_REGISTRYINDEX);

5
src/rspamadm/control.c

@ -111,7 +111,6 @@ rspamd_control_finish_handler (struct rspamd_http_connection *conn,
const gchar *body;
gsize body_len;
struct rspamadm_control_cbdata *cbdata = conn->ud;
struct timeval exit_tv;
body = rspamd_http_message_get_body (msg, &body_len);
parser = ucl_parser_new (0);
@ -157,9 +156,7 @@ rspamd_control_finish_handler (struct rspamd_http_connection *conn,
}
end:
exit_tv.tv_sec = 0;
exit_tv.tv_usec = 0;
event_base_loopexit (rspamd_main->event_loop, &exit_tv);
ev_break (rspamd_main->event_loop, EVBREAK_ALL);
return 0;
}

23
src/rspamadm/lua_repl.c

@ -515,7 +515,7 @@ rspamadm_lua_run_repl (lua_State *L)
{
gchar *input;
gboolean is_multiline = FALSE;
GString *tb;
GString *tb = NULL;
guint i;
for (;;) {
@ -591,15 +591,16 @@ struct rspamadm_lua_repl_session {
};
static void
rspamadm_lua_accept_cb (gint fd, short what, void *arg)
rspamadm_lua_accept_cb (EV_P_ ev_io *w, int revents)
{
struct rspamadm_lua_repl_context *ctx = arg;
struct rspamadm_lua_repl_context *ctx =
(struct rspamadm_lua_repl_context *)w->data;
rspamd_inet_addr_t *addr;
struct rspamadm_lua_repl_session *session;
gint nfd;
if ((nfd =
rspamd_accept_from_socket (fd, &addr, NULL)) == -1) {
rspamd_accept_from_socket (w->fd, &addr, NULL, NULL)) == -1) {
rspamd_fprintf (stderr, "accept failed: %s", strerror (errno));
return;
}
@ -808,7 +809,7 @@ rspamadm_lua (gint argc, gchar **argv, const struct rspamadm_command *cmd)
ctx = g_malloc0 (sizeof (*ctx));
http = rspamd_http_router_new (rspamadm_lua_error_handler,
rspamadm_lua_finish_handler,
NULL,
0.0,
NULL,
rspamd_main->http_ctx);
ctx->L = L;
@ -822,19 +823,17 @@ rspamadm_lua (gint argc, gchar **argv, const struct rspamadm_command *cmd)
fd = rspamd_inet_address_listen (addr, SOCK_STREAM, TRUE);
if (fd != -1) {
struct event *ev;
static ev_io ev;
ev = g_malloc0 (sizeof (*ev));
event_set (ev, fd, EV_READ|EV_PERSIST, rspamadm_lua_accept_cb,
ctx);
event_base_set (ev_base, ev);
event_add (ev, NULL);
ev.data = ctx;
ev_io_init (&ev, rspamadm_lua_accept_cb, fd, EV_READ);
ev_io_start (ev_base, &ev);
rspamd_printf ("listen on %s\n",
rspamd_inet_address_to_string_pretty (addr));
}
}
event_base_loop (ev_base, 0);
ev_loop (ev_base, 0);
exit (EXIT_SUCCESS);
}

15
src/rspamadm/rspamadm.c

@ -379,15 +379,6 @@ main (gint argc, gchar **argv, gchar **env)
rspamd_main->server_pool = rspamd_mempool_new (rspamd_mempool_suggest_size (),
"rspamadm");
#ifdef HAVE_EVENT_NO_CACHE_TIME_FLAG
struct event_config *ev_cfg;
ev_cfg = event_config_new ();
event_config_set_flag (ev_cfg, EVENT_BASE_FLAG_NO_CACHE_TIME);
rspamd_main->ev_base = event_base_new_with_config (ev_cfg);
#else
rspamd_main->event_loop = event_init ();
#endif
rspamadm_fill_internal_commands (all_commands);
help_command.command_data = all_commands;
@ -565,10 +556,8 @@ main (gint argc, gchar **argv, gchar **env)
cmd->run (0, NULL, cmd);
}
event_base_loopexit (rspamd_main->event_loop, NULL);
#ifdef HAVE_EVENT_NO_CACHE_TIME_FLAG
event_config_free (ev_cfg);
#endif
ev_break (rspamd_main->event_loop, EVBREAK_ALL);
REF_RELEASE (rspamd_main->cfg);
rspamd_log_close (rspamd_main->logger, TRUE);

86
src/rspamd_proxy.c

@ -86,7 +86,6 @@ struct rspamd_http_upstream {
struct upstream_list *u;
struct rspamd_cryptobox_pubkey *key;
gdouble timeout;
struct timeval io_tv;
gint parser_from_ref;
gint parser_to_ref;
gboolean local;
@ -101,7 +100,6 @@ struct rspamd_http_mirror {
struct rspamd_cryptobox_pubkey *key;
gdouble prob;
gdouble timeout;
struct timeval io_tv;
gint parser_from_ref;
gint parser_to_ref;
gboolean local;
@ -113,14 +111,13 @@ static const guint64 rspamd_rspamd_proxy_magic = 0xcdeb4fd1fc351980ULL;
struct rspamd_proxy_ctx {
guint64 magic;
/* Events base */
struct ev_loop *ev_base;
struct ev_loop *event_loop;
/* DNS resolver */
struct rspamd_dns_resolver *resolver;
/* Config */
struct rspamd_config *cfg;
/* END OF COMMON PART */
gdouble timeout;
struct timeval io_tv;
/* Encryption key for clients */
struct rspamd_cryptobox_keypair *key;
/* HTTP context */
@ -174,8 +171,8 @@ struct rspamd_proxy_backend_connection {
ucl_object_t *results;
const gchar *err;
struct rspamd_proxy_session *s;
struct timeval *io_tv;
gint backend_sock;
ev_tstamp timeout;
enum rspamd_backend_flags flags;
gint parser_from_ref;
gint parser_to_ref;
@ -464,8 +461,6 @@ rspamd_proxy_parse_upstream (rspamd_mempool_t *pool,
rspamd_lua_add_ref_dtor (L, pool, up->parser_to_ref);
}
double_to_tv (up->timeout, &up->io_tv);
g_hash_table_insert (ctx->upstreams, up->name, up);
return TRUE;
@ -617,8 +612,6 @@ rspamd_proxy_parse_mirror (rspamd_mempool_t *pool,
up->settings_id = rspamd_mempool_strdup (pool, ucl_object_tostring (elt));
}
double_to_tv (up->timeout, &up->io_tv);
g_ptr_array_add (ctx->mirrors, up);
return TRUE;
@ -1144,8 +1137,6 @@ proxy_request_decompress (struct rspamd_http_message *msg)
rspamd_http_message_set_body_from_fstring_steal (msg, body);
rspamd_http_message_remove_header (msg, "Compression");
}
return;
}
static struct rspamd_proxy_session *
@ -1350,7 +1341,7 @@ proxy_open_mirror_connections (struct rspamd_proxy_session *session)
sizeof (*bk_conn));
bk_conn->s = session;
bk_conn->name = m->name;
bk_conn->io_tv = &m->io_tv;
bk_conn->timeout = m->timeout;
bk_conn->up = rspamd_upstream_get (m->u,
RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
@ -1415,7 +1406,7 @@ proxy_open_mirror_connections (struct rspamd_proxy_session *session)
msg->method = HTTP_GET;
rspamd_http_connection_write_message_shared (bk_conn->backend_conn,
msg, NULL, NULL, bk_conn,
bk_conn->io_tv);
bk_conn->timeout);
}
else {
if (session->fname) {
@ -1442,7 +1433,7 @@ proxy_open_mirror_connections (struct rspamd_proxy_session *session)
rspamd_http_connection_write_message (bk_conn->backend_conn,
msg, NULL, NULL, bk_conn,
bk_conn->io_tv);
bk_conn->timeout);
}
g_ptr_array_add (session->mirror_conns, bk_conn);
@ -1468,7 +1459,7 @@ proxy_client_write_error (struct rspamd_proxy_session *session, gint code,
reply->status = rspamd_fstring_new_init (status, strlen (status));
rspamd_http_connection_write_message (session->client_conn,
reply, NULL, NULL, session,
&session->ctx->io_tv);
session->ctx->timeout);
}
}
@ -1566,7 +1557,7 @@ proxy_backend_master_finish_handler (struct rspamd_http_connection *conn,
else {
rspamd_http_connection_write_message (session->client_conn,
msg, NULL, NULL, session,
bk_conn->io_tv);
bk_conn->timeout);
}
return 0;
@ -1625,7 +1616,7 @@ rspamd_proxy_scan_self_reply (struct rspamd_task *task)
NULL,
ctype,
session,
NULL);
0);
}
}
@ -1666,7 +1657,7 @@ rspamd_proxy_self_scan (struct rspamd_proxy_session *session)
msg = session->client_message;
task = rspamd_task_new (session->worker, session->ctx->cfg,
session->pool, session->ctx->lang_det,
session->ctx->ev_base);
session->ctx->event_loop);
task->flags |= RSPAMD_TASK_FLAG_MIME;
task->sock = -1;
@ -1711,23 +1702,18 @@ rspamd_proxy_self_scan (struct rspamd_proxy_session *session)
/* Set global timeout for the task */
if (session->ctx->default_upstream->timeout > 0.0) {
struct timeval task_tv;
task->timeout_ev.data = task;
ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
session->ctx->default_upstream->timeout, 0.0);
ev_timer_start (task->event_loop, &task->timeout_ev);
event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
task);
event_base_set (session->ctx->ev_base, &task->timeout_ev);
double_to_tv (session->ctx->default_upstream->timeout, &task_tv);
event_add (&task->timeout_ev, &task_tv);
}
else if (session->ctx->has_self_scan) {
if (session->ctx->cfg->task_timeout > 0) {
struct timeval task_tv;
event_set (&task->timeout_ev, -1, EV_TIMEOUT, rspamd_task_timeout,
task);
event_base_set (session->ctx->ev_base, &task->timeout_ev);
double_to_tv (session->ctx->cfg->task_timeout, &task_tv);
event_add (&task->timeout_ev, &task_tv);
task->timeout_ev.data = task;
ev_timer_init (&task->timeout_ev, rspamd_task_timeout,
session->ctx->cfg->task_timeout, 0.0);
ev_timer_start (task->event_loop, &task->timeout_ev);
}
}
@ -1783,7 +1769,7 @@ retry:
session->master_conn->up = rspamd_upstream_get (backend->u,
RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
session->master_conn->io_tv = &backend->io_tv;
session->master_conn->timeout = backend->timeout;
if (session->master_conn->up == NULL) {
msg_err_session ("cannot select upstream for %s",
@ -1853,7 +1839,7 @@ retry:
rspamd_http_connection_write_message_shared (
session->master_conn->backend_conn,
msg, NULL, NULL, session->master_conn,
session->master_conn->io_tv);
session->master_conn->timeout);
}
else {
if (session->fname) {
@ -1881,7 +1867,7 @@ retry:
rspamd_http_connection_write_message (
session->master_conn->backend_conn,
msg, NULL, NULL, session->master_conn,
session->master_conn->io_tv);
session->master_conn->timeout);
}
}
@ -2031,9 +2017,9 @@ proxy_milter_error_handler (gint fd,
}
static void
proxy_accept_socket (gint fd, short what, void *arg)
proxy_accept_socket (EV_P_ ev_io *w, int revents)
{
struct rspamd_worker *worker = (struct rspamd_worker *) arg;
struct rspamd_worker *worker = (struct rspamd_worker *)w->data;
struct rspamd_proxy_ctx *ctx;
rspamd_inet_addr_t *addr;
struct rspamd_proxy_session *session;
@ -2042,7 +2028,8 @@ proxy_accept_socket (gint fd, short what, void *arg)
ctx = worker->ctx;
if ((nfd =
rspamd_accept_from_socket (fd, &addr, worker->accept_events)) == -1) {
rspamd_accept_from_socket (w->fd, &addr,
rspamd_worker_throttle_accept_events, worker->accept_events)) == -1) {
msg_warn ("accept failed: %s", strerror (errno));
return;
}
@ -2086,7 +2073,7 @@ proxy_accept_socket (gint fd, short what, void *arg)
rspamd_http_connection_read_message_shared (session->client_conn,
session,
&ctx->io_tv);
session->ctx->timeout);
}
else {
msg_info_session ("accepted milter connection from %s port %d",
@ -2110,9 +2097,9 @@ proxy_accept_socket (gint fd, short what, void *arg)
}
#endif
rspamd_milter_handle_socket (nfd, NULL,
rspamd_milter_handle_socket (nfd, 0.0,
session->pool,
ctx->ev_base,
ctx->event_loop,
proxy_milter_finish_handler,
proxy_milter_error_handler,
session);
@ -2153,30 +2140,29 @@ start_rspamd_proxy (struct rspamd_worker *worker)
struct rspamd_proxy_ctx *ctx = worker->ctx;
ctx->cfg = worker->srv->cfg;
ctx->ev_base = rspamd_prepare_worker (worker, "rspamd_proxy",
ctx->event_loop = rspamd_prepare_worker (worker, "rspamd_proxy",
proxy_accept_socket);
ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
ctx->ev_base,
ctx->event_loop,
worker->srv->cfg);
double_to_tv (ctx->timeout, &ctx->io_tv);
rspamd_map_watch (worker->srv->cfg, ctx->ev_base, ctx->resolver, worker, 0);
rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver, worker, 0);
rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
ctx->ev_base, ctx->resolver->r);
ctx->event_loop, ctx->resolver->r);
ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->ev_base,
ctx->http_ctx = rspamd_http_context_create (ctx->cfg, ctx->event_loop,
ctx->cfg->ups_ctx);
if (ctx->has_self_scan) {
/* Additional initialisation needed */
rspamd_worker_init_scanner (worker, ctx->ev_base, ctx->resolver,
rspamd_worker_init_scanner (worker, ctx->event_loop, ctx->resolver,
&ctx->lang_det);
}
if (worker->srv->cfg->enable_sessions_cache) {
ctx->sessions_cache = rspamd_worker_session_cache_new (worker,
ctx->ev_base);
ctx->event_loop);
}
ctx->milter_ctx.spam_header = ctx->spam_header;
@ -2188,11 +2174,11 @@ start_rspamd_proxy (struct rspamd_worker *worker)
ctx->milter_ctx.cfg = ctx->cfg;
rspamd_milter_init_library (&ctx->milter_ctx);
rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->ev_base,
rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop,
worker);
adjust_upstreams_limits (ctx);
event_base_loop (ctx->ev_base, 0);
ev_loop (ctx->event_loop, 0);
rspamd_worker_block_signals ();
if (ctx->has_self_scan) {

Loading…
Cancel
Save