Browse Source

[Rework] Further isolation of the controller's functions

pull/3150/head
Vsevolod Stakhov 6 years ago
parent
commit
63800059e8
  1. 251
      src/controller.c
  2. 278
      src/libserver/worker_util.c
  3. 23
      src/libserver/worker_util.h
  4. 17
      src/rspamd_proxy.c
  5. 11
      src/worker.c

251
src/controller.c

@ -103,7 +103,6 @@ INIT_LOG_MODULE(controller)
#define COLOR_REJECT "#CB4B4B"
#define COLOR_TOTAL "#9440ED"
static const ev_tstamp rrd_update_time = 1.0;
static const guint64 rspamd_controller_ctx_magic = 0xf72697805e6941faULL;
extern void fuzzy_stat_command (struct rspamd_task *task);
@ -172,9 +171,7 @@ struct rspamd_controller_worker_ctx {
/* Local keypair */
gpointer key;
ev_timer rrd_event;
struct rspamd_rrd_file *rrd;
ev_timer save_stats_event;
struct rspamd_lang_detector *lang_det;
gdouble task_timeout;
};
@ -3104,191 +3101,6 @@ rspamd_controller_accept_socket (EV_P_ ev_io *w, int revents)
rspamd_http_router_handle_socket (ctx->http, nfd, session);
}
static void
rspamd_controller_rrd_update (EV_P_ ev_timer *w, int revents)
{
struct rspamd_controller_worker_ctx *ctx =
(struct rspamd_controller_worker_ctx *)w->data;
struct rspamd_stat *stat;
GArray ar;
gdouble points[METRIC_ACTION_MAX];
GError *err = NULL;
guint i;
g_assert (ctx->rrd != NULL);
stat = ctx->srv->stat;
for (i = METRIC_ACTION_REJECT; i < METRIC_ACTION_MAX; i ++) {
points[i] = stat->actions_stat[i];
}
ar.data = (gchar *)points;
ar.len = sizeof (points);
if (!rspamd_rrd_add_record (ctx->rrd, &ar, rspamd_get_calendar_ticks (),
&err)) {
msg_err_ctx ("cannot update rrd file: %e", err);
g_error_free (err);
}
/* Plan new event */
ev_timer_again (ctx->event_loop, &ctx->rrd_event);
}
static void
rspamd_controller_load_saved_stats (struct rspamd_main *rspamd_main,
struct rspamd_config *cfg)
{
struct ucl_parser *parser;
ucl_object_t *obj;
const ucl_object_t *elt, *subelt;
struct rspamd_stat *stat, stat_copy;
gint i;
if (cfg->stats_file == NULL) {
return;
}
if (access (cfg->stats_file, R_OK) == -1) {
msg_err_config ("cannot load controller stats from %s: %s",
cfg->stats_file, strerror (errno));
return;
}
parser = ucl_parser_new (0);
if (!ucl_parser_add_file (parser, cfg->stats_file)) {
msg_err_config ("cannot parse controller stats from %s: %s",
cfg->stats_file, ucl_parser_get_error (parser));
ucl_parser_free (parser);
return;
}
obj = ucl_parser_get_object (parser);
ucl_parser_free (parser);
stat = rspamd_main->stat;
memcpy (&stat_copy, stat, sizeof (stat_copy));
elt = ucl_object_lookup (obj, "scanned");
if (elt != NULL && ucl_object_type (elt) == UCL_INT) {
stat_copy.messages_scanned = ucl_object_toint (elt);
}
elt = ucl_object_lookup (obj, "learned");
if (elt != NULL && ucl_object_type (elt) == UCL_INT) {
stat_copy.messages_learned = ucl_object_toint (elt);
}
elt = ucl_object_lookup (obj, "actions");
if (elt != NULL) {
for (i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) {
subelt = ucl_object_lookup (elt, rspamd_action_to_str (i));
if (subelt && ucl_object_type (subelt) == UCL_INT) {
stat_copy.actions_stat[i] = ucl_object_toint (subelt);
}
}
}
elt = ucl_object_lookup (obj, "connections_count");
if (elt != NULL && ucl_object_type (elt) == UCL_INT) {
stat_copy.connections_count = ucl_object_toint (elt);
}
elt = ucl_object_lookup (obj, "control_connections_count");
if (elt != NULL && ucl_object_type (elt) == UCL_INT) {
stat_copy.control_connections_count = ucl_object_toint (elt);
}
ucl_object_unref (obj);
memcpy (stat, &stat_copy, sizeof (stat_copy));
}
static void
rspamd_controller_store_saved_stats (struct rspamd_main *rspamd_main,
struct rspamd_config *cfg)
{
struct rspamd_stat *stat;
ucl_object_t *top, *sub;
struct ucl_emitter_functions *efuncs;
gint i, fd;
gchar fpath[PATH_MAX], *tmpfile;
if (cfg->stats_file == NULL) {
return;
}
rspamd_snprintf (fpath, sizeof (fpath), "%s.XXXXXXXX", cfg->stats_file);
fd = g_mkstemp_full (fpath, O_WRONLY|O_TRUNC, 00644);
if (fd == -1) {
msg_err_config ("cannot open for writing controller stats from %s: %s",
fpath, strerror (errno));
return;
}
stat = rspamd_main->stat;
top = ucl_object_typed_new (UCL_OBJECT);
ucl_object_insert_key (top, ucl_object_fromint (
stat->messages_scanned), "scanned", 0, false);
ucl_object_insert_key (top, ucl_object_fromint (
stat->messages_learned), "learned", 0, false);
if (stat->messages_scanned > 0) {
sub = ucl_object_typed_new (UCL_OBJECT);
for (i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) {
ucl_object_insert_key (sub,
ucl_object_fromint (stat->actions_stat[i]),
rspamd_action_to_str (i), 0, false);
}
ucl_object_insert_key (top, sub, "actions", 0, false);
}
ucl_object_insert_key (top,
ucl_object_fromint (stat->connections_count),
"connections", 0, false);
ucl_object_insert_key (top,
ucl_object_fromint (stat->control_connections_count),
"control_connections", 0, false);
efuncs = ucl_object_emit_fd_funcs (fd);
if (!ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT,
efuncs, NULL)) {
msg_err_config ("cannot write stats to %s: %s",
fpath, strerror (errno));
unlink (fpath);
}
else {
if (rename (fpath, cfg->stats_file) == -1) {
msg_err_config ("cannot rename stats from %s to %s: %s",
fpath, cfg->stats_file, strerror (errno));
}
}
ucl_object_unref (top);
close (fd);
ucl_object_emit_funcs_free (efuncs);
}
static void
rspamd_controller_stats_save_periodic (EV_P_ ev_timer *w, int revents)
{
struct rspamd_controller_worker_ctx *ctx =
(struct rspamd_controller_worker_ctx *)w->data;
rspamd_controller_store_saved_stats (ctx->srv, ctx->cfg);
ev_timer_again (EV_A_ w);
}
static void
rspamd_controller_password_sane (struct rspamd_controller_worker_ctx *ctx,
const gchar *password, const gchar *type)
@ -3558,12 +3370,6 @@ lua_csession_send_string (lua_State *L)
return 0;
}
void
rspamd_controller_on_terminate (struct rspamd_worker *worker)
{
rspamd_controller_store_saved_stats (worker->srv, worker->srv->cfg);
}
static void
rspamd_plugin_cbdata_dtor (gpointer p)
{
@ -3678,7 +3484,6 @@ start_controller_worker (struct rspamd_worker *worker)
GHashTableIter iter;
gpointer key, value;
guint i;
const ev_tstamp save_stats_interval = 60; /* 1 minute */
gpointer m;
g_assert (rspamd_worker_check_context (worker->ctx, rspamd_controller_ctx_magic));
@ -3711,34 +3516,8 @@ start_controller_worker (struct rspamd_worker *worker)
&ctx->secure_map, NULL);
}
rspamd_controller_load_saved_stats (ctx->srv, ctx->cfg);
ctx->lang_det = ctx->cfg->lang_det;
/* RRD collector */
if (ctx->cfg->rrd_file && worker->index == 0) {
GError *rrd_err = NULL;
ctx->rrd = rspamd_rrd_file_default (ctx->cfg->rrd_file, &rrd_err);
if (ctx->rrd) {
ctx->rrd_event.data = ctx;
ev_timer_init (&ctx->rrd_event, rspamd_controller_rrd_update,
rrd_update_time, rrd_update_time);
ev_timer_start (ctx->event_loop, &ctx->rrd_event);
}
else if (rrd_err) {
msg_err ("cannot load rrd from %s: %e", ctx->cfg->rrd_file,
rrd_err);
g_error_free (rrd_err);
}
else {
msg_err ("cannot load rrd from %s: unknown error", ctx->cfg->rrd_file);
}
}
else {
ctx->rrd = NULL;
}
rspamd_controller_password_sane (ctx, ctx->password, "normal password");
rspamd_controller_password_sane (ctx, ctx->enable_password, "enable "
"password");
@ -3871,27 +3650,7 @@ start_controller_worker (struct rspamd_worker *worker)
rspamd_symcache_start_refresh (worker->srv->cfg->cache, ctx->event_loop,
worker);
rspamd_stat_init (worker->srv->cfg, ctx->event_loop);
if (worker->index == 0) {
if (!ctx->cfg->disable_monitored) {
rspamd_worker_init_monitored (worker, ctx->event_loop, ctx->resolver);
}
rspamd_map_watch (worker->srv->cfg, ctx->event_loop,
ctx->resolver, worker, TRUE);
/* Schedule periodic stats saving, see #1823 */
ctx->save_stats_event.data = ctx;
ev_timer_init (&ctx->save_stats_event,
rspamd_controller_stats_save_periodic,
save_stats_interval, save_stats_interval);
ev_timer_start (ctx->event_loop, &ctx->save_stats_event);
}
else {
rspamd_map_watch (worker->srv->cfg, ctx->event_loop,
ctx->resolver, worker, FALSE);
}
rspamd_worker_init_controller (worker, &ctx->rrd);
rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop, worker);
#ifdef WITH_HYPERSCAN
@ -3904,13 +3663,7 @@ start_controller_worker (struct rspamd_worker *worker)
/* Start event loop */
ev_loop (ctx->event_loop, 0);
rspamd_worker_block_signals ();
rspamd_controller_on_terminate (worker);
if (ctx->rrd) {
msg_info ("closing rrd file: %s", ctx->rrd->filename);
ev_timer_stop (ctx->event_loop, &ctx->rrd_event);
rspamd_rrd_close (ctx->rrd);
}
rspamd_controller_on_terminate (worker, ctx->rrd);
rspamd_stat_close ();
rspamd_http_router_free (ctx->http);

278
src/libserver/worker_util.c

@ -25,6 +25,7 @@
#include "libutil/map_private.h"
#include "libutil/http_private.h"
#include "libutil/http_router.h"
#include "libutil/rrd.h"
#ifdef WITH_GPERF_TOOLS
#include <gperftools/profiler.h>
@ -1716,4 +1717,281 @@ rspamd_worker_init_scanner (struct rspamd_worker *worker,
worker->srv->cfg);
*plang_det = worker->srv->cfg->lang_det;
}
void
rspamd_controller_store_saved_stats (struct rspamd_main *rspamd_main,
struct rspamd_config *cfg)
{
struct rspamd_stat *stat;
ucl_object_t *top, *sub;
struct ucl_emitter_functions *efuncs;
gint i, fd;
gchar fpath[PATH_MAX];
if (cfg->stats_file == NULL) {
return;
}
rspamd_snprintf (fpath, sizeof (fpath), "%s.XXXXXXXX", cfg->stats_file);
fd = g_mkstemp_full (fpath, O_WRONLY|O_TRUNC, 00644);
if (fd == -1) {
msg_err_config ("cannot open for writing controller stats from %s: %s",
fpath, strerror (errno));
return;
}
stat = rspamd_main->stat;
top = ucl_object_typed_new (UCL_OBJECT);
ucl_object_insert_key (top, ucl_object_fromint (
stat->messages_scanned), "scanned", 0, false);
ucl_object_insert_key (top, ucl_object_fromint (
stat->messages_learned), "learned", 0, false);
if (stat->messages_scanned > 0) {
sub = ucl_object_typed_new (UCL_OBJECT);
for (i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) {
ucl_object_insert_key (sub,
ucl_object_fromint (stat->actions_stat[i]),
rspamd_action_to_str (i), 0, false);
}
ucl_object_insert_key (top, sub, "actions", 0, false);
}
ucl_object_insert_key (top,
ucl_object_fromint (stat->connections_count),
"connections", 0, false);
ucl_object_insert_key (top,
ucl_object_fromint (stat->control_connections_count),
"control_connections", 0, false);
efuncs = ucl_object_emit_fd_funcs (fd);
if (!ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT,
efuncs, NULL)) {
msg_err_config ("cannot write stats to %s: %s",
fpath, strerror (errno));
unlink (fpath);
}
else {
if (rename (fpath, cfg->stats_file) == -1) {
msg_err_config ("cannot rename stats from %s to %s: %s",
fpath, cfg->stats_file, strerror (errno));
}
}
ucl_object_unref (top);
close (fd);
ucl_object_emit_funcs_free (efuncs);
}
static ev_timer rrd_timer;
void
rspamd_controller_on_terminate (struct rspamd_worker *worker,
struct rspamd_rrd_file *rrd)
{
struct rspamd_abstract_worker_ctx *ctx;
ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx;
rspamd_controller_store_saved_stats (worker->srv, worker->srv->cfg);
if (rrd) {
ev_timer_stop (ctx->event_loop, &rrd_timer);
msg_info ("closing rrd file: %s", rrd->filename);
rspamd_rrd_close (rrd);
}
}
static void
rspamd_controller_load_saved_stats (struct rspamd_main *rspamd_main,
struct rspamd_config *cfg)
{
struct ucl_parser *parser;
ucl_object_t *obj;
const ucl_object_t *elt, *subelt;
struct rspamd_stat *stat, stat_copy;
gint i;
if (cfg->stats_file == NULL) {
return;
}
if (access (cfg->stats_file, R_OK) == -1) {
msg_err_config ("cannot load controller stats from %s: %s",
cfg->stats_file, strerror (errno));
return;
}
parser = ucl_parser_new (0);
if (!ucl_parser_add_file (parser, cfg->stats_file)) {
msg_err_config ("cannot parse controller stats from %s: %s",
cfg->stats_file, ucl_parser_get_error (parser));
ucl_parser_free (parser);
return;
}
obj = ucl_parser_get_object (parser);
ucl_parser_free (parser);
stat = rspamd_main->stat;
memcpy (&stat_copy, stat, sizeof (stat_copy));
elt = ucl_object_lookup (obj, "scanned");
if (elt != NULL && ucl_object_type (elt) == UCL_INT) {
stat_copy.messages_scanned = ucl_object_toint (elt);
}
elt = ucl_object_lookup (obj, "learned");
if (elt != NULL && ucl_object_type (elt) == UCL_INT) {
stat_copy.messages_learned = ucl_object_toint (elt);
}
elt = ucl_object_lookup (obj, "actions");
if (elt != NULL) {
for (i = METRIC_ACTION_REJECT; i <= METRIC_ACTION_NOACTION; i++) {
subelt = ucl_object_lookup (elt, rspamd_action_to_str (i));
if (subelt && ucl_object_type (subelt) == UCL_INT) {
stat_copy.actions_stat[i] = ucl_object_toint (subelt);
}
}
}
elt = ucl_object_lookup (obj, "connections_count");
if (elt != NULL && ucl_object_type (elt) == UCL_INT) {
stat_copy.connections_count = ucl_object_toint (elt);
}
elt = ucl_object_lookup (obj, "control_connections_count");
if (elt != NULL && ucl_object_type (elt) == UCL_INT) {
stat_copy.control_connections_count = ucl_object_toint (elt);
}
ucl_object_unref (obj);
memcpy (stat, &stat_copy, sizeof (stat_copy));
}
struct rspamd_controller_periodics_cbdata {
struct rspamd_worker *worker;
struct rspamd_rrd_file *rrd;
struct rspamd_stat *stat;
ev_timer save_stats_event;
};
static void
rspamd_controller_rrd_update (EV_P_ ev_timer *w, int revents)
{
struct rspamd_controller_periodics_cbdata *cbd =
(struct rspamd_controller_periodics_cbdata *)w->data;
struct rspamd_stat *stat;
GArray ar;
gdouble points[METRIC_ACTION_MAX];
GError *err = NULL;
guint i;
g_assert (cbd->rrd != NULL);
stat = cbd->stat;
for (i = METRIC_ACTION_REJECT; i < METRIC_ACTION_MAX; i ++) {
points[i] = stat->actions_stat[i];
}
ar.data = (gchar *)points;
ar.len = sizeof (points);
if (!rspamd_rrd_add_record (cbd->rrd, &ar, rspamd_get_calendar_ticks (),
&err)) {
msg_err ("cannot update rrd file: %e", err);
g_error_free (err);
}
/* Plan new event */
ev_timer_again (EV_A_ w);
}
static void
rspamd_controller_stats_save_periodic (EV_P_ ev_timer *w, int revents)
{
struct rspamd_controller_periodics_cbdata *cbd =
(struct rspamd_controller_periodics_cbdata *)w->data;
rspamd_controller_store_saved_stats (cbd->worker->srv, cbd->worker->srv->cfg);
ev_timer_again (EV_A_ w);
}
void
rspamd_worker_init_controller (struct rspamd_worker *worker,
struct rspamd_rrd_file **prrd)
{
struct rspamd_abstract_worker_ctx *ctx;
static const ev_tstamp rrd_update_time = 1.0;
ctx = (struct rspamd_abstract_worker_ctx *)worker->ctx;
rspamd_controller_load_saved_stats (worker->srv, worker->srv->cfg);
if (worker->index == 0) {
/* Enable periodics and other stuff */
static struct rspamd_controller_periodics_cbdata cbd;
const ev_tstamp save_stats_interval = 60; /* 1 minute */
memset (&cbd, 0, sizeof (cbd));
cbd.save_stats_event.data = &cbd;
cbd.worker = worker;
cbd.stat = worker->srv->stat;
ev_timer_init (&cbd.save_stats_event,
rspamd_controller_stats_save_periodic,
save_stats_interval, save_stats_interval);
ev_timer_start (ctx->event_loop, &cbd.save_stats_event);
rspamd_map_watch (worker->srv->cfg, ctx->event_loop,
ctx->resolver, worker, TRUE);
if (prrd != NULL) {
if (ctx->cfg->rrd_file && worker->index == 0) {
GError *rrd_err = NULL;
*prrd = rspamd_rrd_file_default (ctx->cfg->rrd_file, &rrd_err);
if (*prrd) {
cbd.rrd = *prrd;
rrd_timer.data = &cbd;
ev_timer_init (&rrd_timer, rspamd_controller_rrd_update,
rrd_update_time, rrd_update_time);
ev_timer_start (ctx->event_loop, &rrd_timer);
}
else if (rrd_err) {
msg_err ("cannot load rrd from %s: %e", ctx->cfg->rrd_file,
rrd_err);
g_error_free (rrd_err);
}
else {
msg_err ("cannot load rrd from %s: unknown error",
ctx->cfg->rrd_file);
}
}
else {
*prrd = NULL;
}
}
if (!ctx->cfg->disable_monitored) {
rspamd_worker_init_monitored (worker,
ctx->event_loop, ctx->resolver);
}
}
else {
rspamd_map_watch (worker->srv->cfg, ctx->event_loop,
ctx->resolver, worker, FALSE);
}
}

23
src/libserver/worker_util.h

@ -244,11 +244,30 @@ gboolean rspamd_check_termination_clause (struct rspamd_main *rspamd_main,
*/
gboolean rspamd_worker_call_finish_handlers (struct rspamd_worker *worker);
struct rspamd_rrd_file;
/**
* Defined in controller.c
* Terminate controller worker
* @param worker
*/
extern void rspamd_controller_on_terminate (struct rspamd_worker *worker);
void rspamd_controller_on_terminate (struct rspamd_worker *worker,
struct rspamd_rrd_file *rrd);
/**
* Inits controller worker
* @param worker
* @param ev_base
* @param prrd
*/
void rspamd_worker_init_controller (struct rspamd_worker *worker,
struct rspamd_rrd_file **prrd);
/**
* Saves stats
* @param rspamd_main
* @param cfg
*/
void rspamd_controller_store_saved_stats (struct rspamd_main *rspamd_main,
struct rspamd_config *cfg);
#ifdef WITH_HYPERSCAN
struct rspamd_control_command;

17
src/rspamd_proxy.c

@ -2216,6 +2216,7 @@ void
start_rspamd_proxy (struct rspamd_worker *worker)
{
struct rspamd_proxy_ctx *ctx = worker->ctx;
gboolean is_controller = FALSE;
g_assert (rspamd_worker_check_context (worker->ctx, rspamd_rspamd_proxy_magic));
ctx->cfg = worker->srv->cfg;
@ -2235,9 +2236,6 @@ start_rspamd_proxy (struct rspamd_worker *worker)
(rspamd_mempool_destruct_t)rspamd_http_context_free,
ctx->http_ctx);
rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver,
worker, 0);
if (ctx->has_self_scan) {
/* Additional initialisation needed */
rspamd_worker_init_scanner (worker, ctx->event_loop, ctx->resolver,
@ -2273,6 +2271,7 @@ start_rspamd_proxy (struct rspamd_worker *worker)
msg_info ("no controller or normal workers defined, execute "
"controller periodics in this worker");
worker->flags |= RSPAMD_WORKER_CONTROLLER;
is_controller = TRUE;
}
}
}
@ -2294,6 +2293,14 @@ start_rspamd_proxy (struct rspamd_worker *worker)
ctx->milter_ctx.cfg = ctx->cfg;
rspamd_milter_init_library (&ctx->milter_ctx);
if (is_controller) {
rspamd_worker_init_controller (worker, NULL);
}
else {
rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver,
worker, 0);
}
rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop,
worker);
adjust_upstreams_limits (ctx);
@ -2305,6 +2312,10 @@ start_rspamd_proxy (struct rspamd_worker *worker)
rspamd_stat_close ();
}
if (is_controller) {
rspamd_controller_on_terminate (worker, NULL);
}
REF_RELEASE (ctx->cfg);
rspamd_log_close (worker->srv->logger, TRUE);

11
src/worker.c

@ -507,7 +507,6 @@ start_worker (struct rspamd_worker *worker)
ctx->resolver = rspamd_dns_resolver_init (worker->srv->logger,
ctx->event_loop,
worker->srv->cfg);
rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver, worker, 0);
rspamd_upstreams_library_config (worker->srv->cfg, ctx->cfg->ups_ctx,
ctx->event_loop, ctx->resolver->r);
@ -548,6 +547,14 @@ start_worker (struct rspamd_worker *worker)
}
}
if (is_controller) {
rspamd_worker_init_controller (worker, NULL);
}
else {
rspamd_map_watch (worker->srv->cfg, ctx->event_loop, ctx->resolver,
worker, 0);
}
rspamd_lua_run_postloads (ctx->cfg->lua_state, ctx->cfg, ctx->event_loop,
worker);
@ -555,7 +562,7 @@ start_worker (struct rspamd_worker *worker)
rspamd_worker_block_signals ();
if (is_controller) {
rspamd_controller_on_terminate (
rspamd_controller_on_terminate (worker, NULL);
}
rspamd_stat_close ();

Loading…
Cancel
Save