Browse Source

* Add rolling history feature saving last 200 scanned messages.

Add /history command handler in webui.
rspamd-0.5
Vsevolod Stakhov 13 years ago
parent
commit
012167478a
  1. 1
      lib/CMakeLists.txt
  2. 3
      src/main.c
  3. 3
      src/main.h
  4. 15
      src/protocol.c
  5. 145
      src/roll_history.c
  6. 90
      src/roll_history.h
  7. 4
      src/smtp_utils.c
  8. 7
      src/util.c
  9. 4
      src/util.h
  10. 139
      src/webui.c

1
lib/CMakeLists.txt

@ -39,6 +39,7 @@ SET(LIBRSPAMDSERVERSRC
../src/kvstorage_file.c
../src/lmtp_proto.c
../src/proxy.c
../src/roll_history.c
../src/settings.c
../src/spf.c
../src/statfile.c

3
src/main.c

@ -1022,6 +1022,9 @@ main (gint argc, gchar **argv, gchar **env)
config_logger (rspamd_main, type, TRUE);
/* Create rolling history */
rspamd_main->history = rspamd_roll_history_new (rspamd_main->server_pool);
msg_info ("rspamd " RVERSION " is starting, build id: " RID);
rspamd_main->cfg->cfg_name = memory_pool_strdup (rspamd_main->cfg->cfg_pool, rspamd_main->cfg->cfg_name);

3
src/main.h

@ -19,6 +19,7 @@
#include "events.h"
#include "util.h"
#include "logger.h"
#include "roll_history.h"
/* Default values */
#define FIXED_CONFIG_FILE ETC_PREFIX "/rspamd.xml"
@ -104,6 +105,7 @@ struct rspamd_main {
uid_t workers_uid; /**< worker's uid running to */
gid_t workers_gid; /**< worker's gid running to */
gboolean is_privilleged; /**< true if run in privilleged mode */
struct roll_history *history; /**< rolling history */
};
struct counter_data {
@ -258,6 +260,7 @@ struct worker_task {
#endif
struct timeval tv; /**< time of connection */
struct rspamd_view *view; /**< matching view */
guint32 scan_milliseconds; /**< how much milliseconds passed */
gboolean view_checked;
gboolean pass_all_filters; /**< pass task throught every rule */
guint32 parser_recursion; /**< for avoiding recursion stack overflow */

15
src/protocol.c

@ -1291,11 +1291,11 @@ show_metric_result (gpointer metric_name, gpointer metric_value, void *user_data
cd->log_offset += rspamd_snprintf (cd->log_buf + cd->log_offset,
cd->log_size - cd->log_offset, "]), len: %z, time: %s, dns req: %d,",
task->msg->len, calculate_check_time (&task->tv, &task->ts,
task->cfg->clock_res), task->dns_requests);
task->cfg->clock_res, &task->scan_milliseconds), task->dns_requests);
#else
cd->log_offset += rspamd_snprintf (cd->log_buf + cd->log_offset, cd->log_size - cd->log_offset,
"]), len: %z, time: %s, dns req: %d,",
task->msg->len, calculate_check_time (&task->tv, task->cfg->clock_res), task->dns_requests);
task->msg->len, calculate_check_time (&task->tv, task->cfg->clock_res, &task->scan_milliseconds), task->dns_requests);
#endif
}
}
@ -1362,6 +1362,7 @@ write_check_reply (struct worker_task *task)
}
cd.alive = TRUE;
if (task->proto == SPAMC_PROTO && !task->is_http) {
/* Ignore metrics, just write report for 'default' metric */
@ -1379,6 +1380,8 @@ write_check_reply (struct worker_task *task)
return FALSE;
}
}
/* Update history */
rspamd_roll_history_update (task->worker->srv->history, task);
}
else {
/* Show default metric first */
@ -1401,6 +1404,8 @@ write_check_reply (struct worker_task *task)
return FALSE;
}
}
/* Update history */
rspamd_roll_history_update (task->worker->srv->history, task);
g_hash_table_remove (task->results, "default");
/* Write result for each metric separately */
@ -1433,6 +1438,7 @@ write_check_reply (struct worker_task *task)
write_hashes_to_log (task, logbuf, cd.log_offset, cd.log_size);
msg_info ("%s", logbuf);
if (!task->is_json) {
/* Write message id */
if (task->proto == RSPAMC_PROTO && task->proto_ver >= 12) {
@ -1518,6 +1524,7 @@ write_process_reply (struct worker_task *task)
cd.log_size = sizeof (logbuf);
cd.alive = TRUE;
if (task->proto == SPAMC_PROTO) {
/* Ignore metrics, just write report for 'default' metric */
metric_res = g_hash_table_lookup (task->results, "default");
@ -1534,6 +1541,8 @@ write_process_reply (struct worker_task *task)
return FALSE;
}
}
/* Update history */
rspamd_roll_history_update (task->worker->srv->history, task);
}
else {
/* Show default metric first */
@ -1551,6 +1560,8 @@ write_process_reply (struct worker_task *task)
return FALSE;
}
}
/* Update history */
rspamd_roll_history_update (task->worker->srv->history, task);
g_hash_table_remove (task->results, "default");
/* Write result for each metric separately */

145
src/roll_history.c

@ -0,0 +1,145 @@
/* Copyright (c) 2010-2012, Vsevolod Stakhov
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "config.h"
#include "main.h"
#include "roll_history.h"
/**
* Returns new roll history
* @param pool pool for shared memory
* @return new structure
*/
struct roll_history*
rspamd_roll_history_new (memory_pool_t *pool)
{
struct roll_history *new;
if (pool == NULL) {
return NULL;
}
new = memory_pool_alloc0_shared (pool, sizeof (struct roll_history));
new->pool = pool;
new->mtx = memory_pool_get_mutex (pool);
return new;
}
struct history_metric_callback_data {
gchar *pos;
gint remain;
};
static void
roll_history_symbols_callback (gpointer key, gpointer value, void *user_data)
{
struct history_metric_callback_data *cb = user_data;
struct symbol *s = value;
guint wr;
if (cb->remain > 0) {
wr = rspamd_snprintf (cb->pos, cb->remain, "%s, ", s->name);
cb->pos += wr;
cb->remain -= wr;
}
}
/**
* Update roll history with data from task
* @param history roll history object
* @param task task object
*/
void
rspamd_roll_history_update (struct roll_history *history, struct worker_task *task)
{
gint row_num;
struct roll_history_row *row;
struct metric_result *metric_res;
struct history_metric_callback_data cbdata;
if (history->need_lock) {
/* Some process is getting history, so wait on a mutex */
memory_pool_lock_mutex (history->mtx);
history->need_lock = FALSE;
memory_pool_unlock_mutex (history->mtx);
}
/* First of all obtain check and obtain row number */
g_atomic_int_compare_and_exchange (&history->cur_row, HISTORY_MAX_ROWS, 0);
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30))
row_num = g_atomic_int_add (&history->cur_row, 1);
#else
row_num = g_atomic_int_exchange_and_add (&history->cur_row, 1);
#endif
if (row_num < HISTORY_MAX_ROWS) {
row = &history->rows[row_num];
row->completed = FALSE;
}
else {
msg_err ("internal error with history roll occured, row number is invalid: %d", row_num);
return;
}
/* Add information from task to roll history */
memcpy (&row->from_addr, &task->from_addr, sizeof (row->from_addr));
memcpy (&row->tv, &task->tv, sizeof (row->tv));
/* Strings */
rspamd_strlcpy (row->message_id, task->message_id, sizeof (row->message_id));
if (task->user) {
rspamd_strlcpy (row->user, task->user, sizeof (row->message_id));
}
else {
row->user[0] = '\0';
}
/* Get default metric */
metric_res = g_hash_table_lookup (task->results, DEFAULT_METRIC);
if (metric_res == NULL) {
row->symbols[0] = '\0';
row->action = METRIC_ACTION_NOACTION;
}
else {
row->score = metric_res->score;
row->required_score = metric_res->metric->required_score;
row->action = check_metric_action (metric_res->score, metric_res->metric->required_score, metric_res->metric);
cbdata.pos = row->symbols;
cbdata.remain = sizeof (row->symbols);
g_hash_table_foreach (metric_res->symbols, roll_history_symbols_callback, &cbdata);
if (cbdata.remain > 0) {
/* Remove last whitespace and comma */
*cbdata.pos-- = '\0';
*cbdata.pos-- = '\0';
*cbdata.pos = '\0';
}
}
row->scan_time = task->scan_milliseconds;
row->len = task->msg->len;
row->completed = TRUE;
}

90
src/roll_history.h

@ -0,0 +1,90 @@
/* Copyright (c) 2010-2012, Vsevolod Stakhov
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef ROLL_HISTORY_H_
#define ROLL_HISTORY_H_
#include "config.h"
#include "mem_pool.h"
/*
* Roll history is a special cycled buffer for checked messages, it is designed for writing history messages
* and displaying them in webui
*/
#define HISTORY_MAX_ID 100
#define HISTORY_MAX_SYMBOLS 200
#define HISTORY_MAX_USER 20
#define HISTORY_MAX_ROWS 200
struct worker_task;
struct roll_history_row {
struct timeval tv;
gchar message_id[HISTORY_MAX_ID];
gchar symbols[HISTORY_MAX_SYMBOLS];
gchar user[HISTORY_MAX_USER];
#ifdef HAVE_INET_PTON
struct {
union {
struct in_addr in4;
struct in6_addr in6;
} d;
gboolean ipv6;
gboolean has_addr;
} from_addr;
#else
struct in_addr from_addr;
#endif
gsize len;
guint scan_time;
gint action;
gdouble score;
gdouble required_score;
guint8 completed;
};
struct roll_history {
struct roll_history_row rows[HISTORY_MAX_ROWS];
gint cur_row;
memory_pool_t *pool;
gboolean need_lock;
memory_pool_mutex_t *mtx;
};
/**
* Returns new roll history
* @param pool pool for shared memory
* @return new structure
*/
struct roll_history* rspamd_roll_history_new (memory_pool_t *pool);
/**
* Update roll history with data from task
* @param history roll history object
* @param task task object
*/
void rspamd_roll_history_update (struct roll_history *history, struct worker_task *task);
#endif /* ROLL_HISTORY_H_ */

4
src/smtp_utils.c

@ -184,10 +184,10 @@ smtp_metric_callback (gpointer key, gpointer value, gpointer ud)
#ifdef HAVE_CLOCK_GETTIME
cd->log_offset += rspamd_snprintf (cd->log_buf + cd->log_offset, cd->log_size - cd->log_offset, "]), len: %z, time: %s,",
task->msg->len, calculate_check_time (&task->tv, &task->ts, task->cfg->clock_res));
task->msg->len, calculate_check_time (&task->tv, &task->ts, task->cfg->clock_res, &task->scan_milliseconds));
#else
cd->log_offset += rspamd_snprintf (cd->log_buf + cd->log_offset, cd->log_size - cd->log_offset, "]), len: %z, time: %s,",
task->msg->len, calculate_check_time (&task->tv, task->cfg->clock_res));
task->msg->len, calculate_check_time (&task->tv, task->cfg->clock_res, &task->scan_milliseconds));
#endif
}

7
src/util.c

@ -902,16 +902,17 @@ resolve_stat_filename (memory_pool_t * pool, gchar *pattern, gchar *rcpt, gchar
#ifdef HAVE_CLOCK_GETTIME
const gchar *
calculate_check_time (struct timeval *tv, struct timespec *begin, gint resolution)
calculate_check_time (struct timeval *tv, struct timespec *begin, gint resolution, guint32 *scan_time)
#else
const gchar *
calculate_check_time (struct timeval *begin, gint resolution)
calculate_check_time (struct timeval *begin, gint resolution, guint32 *scan_time)
#endif
{
double vdiff, diff;
static gchar res[64];
static gchar fmt[sizeof ("%.10f ms real, %.10f ms virtual")];
struct timeval tv_now;
if (gettimeofday (&tv_now, NULL) == -1) {
msg_warn ("gettimeofday failed: %s", strerror (errno));
}
@ -937,6 +938,8 @@ calculate_check_time (struct timeval *begin, gint resolution)
vdiff = diff;
#endif
*scan_time = diff;
sprintf (fmt, "%%.%dfms real, %%.%dfms virtual", resolution, resolution);
snprintf (res, sizeof (res), fmt, diff, vdiff);

4
src/util.h

@ -128,9 +128,9 @@ gchar* resolve_stat_filename (memory_pool_t *pool, gchar *pattern, gchar *rcpt,
/*
* Calculate check time with specified resolution of timer
*/
const gchar* calculate_check_time (struct timeval *tv, struct timespec *begin, gint resolution);
const gchar* calculate_check_time (struct timeval *tv, struct timespec *begin, gint resolution, guint32 *scan_ms);
#else
const gchar* calculate_check_time (struct timeval *begin, gint resolution);
const gchar* calculate_check_time (struct timeval *begin, gint resolution, guint32 *scan_ms);
#endif
/*

139
src/webui.c

@ -66,6 +66,8 @@
#define PATH_MAPS "/maps"
#define PATH_GET_MAP "/getmap"
#define PATH_GRAPH "/graph"
#define PATH_PIE_CHART "/pie"
#define PATH_HISTORY "/history"
/* Graph colors */
#define COLOR_CLEAN "#58A458"
@ -640,6 +642,141 @@ http_handle_graph (struct evhttp_request *req, gpointer arg)
evbuffer_free (evb);
}
/*
* Pie chart command handler:
* request: /pie
* headers: Password
* reply: json [
* { label: "Foo", data: 11 },
* { label: "Bar", data: 20 },
* {...}
* ]
*/
static void
http_handle_pie_chart (struct evhttp_request *req, gpointer arg)
{
struct rspamd_webui_worker_ctx *ctx = arg;
struct evbuffer *evb;
gdouble data[4], total;
evb = evbuffer_new ();
if (!evb) {
msg_err ("cannot allocate evbuffer for reply");
evhttp_send_reply (req, HTTP_INTERNAL, "500 insufficient memory", NULL);
return;
}
total = ctx->srv->stat->messages_scanned;
if (total != 0) {
data[0] = ctx->srv->stat->actions_stat[METRIC_ACTION_NOACTION] / total * 100.;
data[1] = (ctx->srv->stat->actions_stat[METRIC_ACTION_ADD_HEADER] + ctx->srv->stat->actions_stat[METRIC_ACTION_REWRITE_SUBJECT]) / total * 100.;
data[2] = ctx->srv->stat->actions_stat[METRIC_ACTION_GREYLIST] / total * 100.;
data[3] = ctx->srv->stat->actions_stat[METRIC_ACTION_REJECT] / total * 100.;
evbuffer_add_printf (evb, "[{\"label\": \"Clean messages\", \"color\": \""
COLOR_CLEAN "\", \"data\":%.2f},", data[0]);
evbuffer_add_printf (evb, "{\"label\": \"Probable spam messages\", \"color\": \""
COLOR_PROBABLE_SPAM "\", \"data\":%.2f},", data[1]);
evbuffer_add_printf (evb, "{\"label\": \"Greylisted messages\", \"color\": \""
COLOR_GREYLIST "\", \"data\":%.2f},", data[2]);
evbuffer_add_printf (evb, "{\"label\": \"Rejected messages\", \"color\": \""
COLOR_REJECT "\", \"data\":%.2f}]" CRLF, data[3]);
}
else {
evbuffer_add_printf (evb, "[{\"label\": \"Not scanned messages\", \"data\": 0}]" CRLF);
}
evhttp_add_header (req->output_headers, "Connection", "close");
http_calculate_content_length (evb, req);
evhttp_send_reply (req, HTTP_OK, "OK", evb);
evbuffer_free (evb);
}
/*
* History command handler:
* request: /history
* headers: Password
* reply: json [
* { label: "Foo", data: 11 },
* { label: "Bar", data: 20 },
* {...}
* ]
*/
static void
http_handle_history (struct evhttp_request *req, gpointer arg)
{
struct rspamd_webui_worker_ctx *ctx = arg;
struct evbuffer *evb;
struct roll_history_row *row;
struct roll_history copied_history;
gint i, row_num;
struct tm *tm;
gchar timebuf[32];
gchar ip_buf[INET6_ADDRSTRLEN];
evb = evbuffer_new ();
if (!evb) {
msg_err ("cannot allocate evbuffer for reply");
evhttp_send_reply (req, HTTP_INTERNAL, "500 insufficient memory", NULL);
return;
}
/* Set lock on history */
memory_pool_lock_mutex (ctx->srv->history->mtx);
ctx->srv->history->need_lock = TRUE;
/* Copy locked */
memcpy (&copied_history, ctx->srv->history, sizeof (copied_history));
memory_pool_unlock_mutex (ctx->srv->history->mtx);
/* Trailer */
evbuffer_add (evb, "[", 1);
/* Go throught all rows */
row_num = copied_history.cur_row;
for (i = 0; i < HISTORY_MAX_ROWS; i ++, row_num ++) {
if (row_num == HISTORY_MAX_ROWS) {
row_num = 0;
}
row = &copied_history.rows[row_num];
/* Get only completed rows */
if (row->completed) {
tm = localtime (&row->tv.tv_sec);
strftime (timebuf, sizeof (timebuf), "%F %H:%M:%S", tm);
#ifdef HAVE_INET_PTON
if (row->from_addr.ipv6) {
inet_ntop (AF_INET6, &row->from_addr.d.in6, ip_buf, sizeof (ip_buf));
}
else {
inet_ntop (AF_INET, &row->from_addr.d.in4, ip_buf, sizeof (ip_buf));
}
#else
rspamd_strlcpy (ip_buf, inet_ntoa (task->from_addr), sizeof (ip_buf));
#endif
if (row->user[0] != '\0') {
evbuffer_add_printf (evb, "{\"time\":\"%s\",\"id\":\"%s\",\"ip\":\"%s\",\"action\":\"%s\","
"\"score\":%.2f,\"required_score\": %.2f,\"symbols\":\"%s\",\"size\":%zd,\"scan_time\":%u,"
"\"user\":\"%s\"}%s", timebuf, row->message_id, ip_buf, str_action_metric (row->action),
row->score, row->required_score, row->symbols, row->len, row->scan_time, row->user, i == HISTORY_MAX_ROWS - 1 ? "" : ",");
}
else {
evbuffer_add_printf (evb, "{\"time\":\"%s\",\"id\":\"%s\",\"ip\":\"%s\",\"action\":\"%s\","
"\"score\": %.2f,\"required_score\":%.2f,\"symbols\":\"%s\",\"size\":%zd,\"scan_time\":%u}%s",
timebuf, row->message_id, ip_buf, str_action_metric (row->action),
row->score, row->required_score, row->symbols, row->len, row->scan_time, i == HISTORY_MAX_ROWS - 1 ? "" : ",");
}
}
}
evbuffer_add (evb, "]" CRLF, 3);
evhttp_add_header (req->output_headers, "Connection", "close");
http_calculate_content_length (evb, req);
evhttp_send_reply (req, HTTP_OK, "OK", evb);
evbuffer_free (evb);
}
gpointer
init_webui_worker (void)
@ -719,6 +856,8 @@ start_webui_worker (struct rspamd_worker *worker)
evhttp_set_cb (ctx->http, PATH_MAPS, http_handle_maps, ctx);
evhttp_set_cb (ctx->http, PATH_GET_MAP, http_handle_get_map, ctx);
evhttp_set_cb (ctx->http, PATH_GRAPH, http_handle_graph, ctx);
evhttp_set_cb (ctx->http, PATH_PIE_CHART, http_handle_pie_chart, ctx);
evhttp_set_cb (ctx->http, PATH_HISTORY, http_handle_history, ctx);
ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);

Loading…
Cancel
Save