Browse Source

Parse HTTP requests, cleanup the code.

--HG--
extra : rebase_source : 6b35fbf55f
pull/26/head
Vsevolod Stakhov 12 years ago
parent
commit
e2f2eed337
  1. 2
      src/cfg_utils.c
  2. 3
      src/controller.c
  3. 7
      src/main.h
  4. 24
      src/plugins/surbl.c
  5. 659
      src/protocol.c
  6. 42
      src/protocol.h
  7. 23
      src/util.c
  8. 5
      src/util.h
  9. 288
      src/worker.c

2
src/cfg_utils.c

@ -892,11 +892,13 @@ read_rspamd_config (struct config_file *cfg, const gchar *filename,
rspamd_ucl_add_conf_macros (parser, cfg);
if (!ucl_parser_add_chunk (parser, data, st.st_size)) {
msg_err ("ucl parser error: %s", ucl_parser_get_error (parser));
ucl_parser_free (parser);
munmap (data, st.st_size);
return FALSE;
}
munmap (data, st.st_size);
cfg->rcl_obj = ucl_parser_get_object (parser);
ucl_parser_free (parser);
res = TRUE;
}

3
src/controller.c

@ -1226,7 +1226,8 @@ process_header (f_str_t *line, struct controller_session *session)
struct rspamd_controller_ctx *ctx = session->worker->ctx;
controller_func_t custom_handler;
headern = separate_command (line, ':');
/* XXX: temporary workaround */
headern = NULL;
if (line == NULL || headern == NULL) {
msg_warn ("bad header: %V", line);

7
src/main.h

@ -187,15 +187,11 @@ struct worker_task {
CLOSING_CONNECTION,
WRITING_REPLY
} state; /**< current session state */
size_t content_length; /**< length of user's input */
enum rspamd_protocol proto; /**< protocol (rspamc or spamc) */
guint proto_ver; /**< protocol version */
enum rspamd_command cmd; /**< command */
struct custom_command *custom_cmd; /**< custom command if any */
gint sock; /**< socket descriptor */
gboolean is_mime; /**< if this task is mime task */
gboolean is_json; /**< output is JSON */
gboolean is_http; /**< output is HTTP */
gboolean allow_learn; /**< allow learning */
gboolean is_skipped; /**< whether message was skipped by configuration */
@ -203,7 +199,7 @@ struct worker_task {
gchar *from; /**< from header value */
gchar *queue_id; /**< queue id if specified */
const gchar *message_id; /**< message id */
GList *rcpt; /**< recipients list */
GList *rcpt; /**< recipients list */
guint nrcpt; /**< number of recipients */
#ifdef HAVE_INET_PTON
struct {
@ -222,7 +218,6 @@ struct worker_task {
gchar *user; /**< user to deliver */
gchar *subject; /**< subject (for non-mime) */
gchar *hostname; /**< hostname reported by MTA */
gchar *statfile; /**< statfile for learning */
GString *msg; /**< message buffer */
rspamd_io_dispatcher_t *dispatcher; /**< IO dispatcher object */
struct rspamd_http_connection *http_conn; /**< HTTP server connection */

24
src/plugins/surbl.c

@ -57,11 +57,10 @@
static struct surbl_ctx *surbl_module_ctx = NULL;
static gint surbl_filter (struct worker_task *task);
static void surbl_test_url (struct worker_task *task, void *user_data);
static void dns_callback (struct rspamd_dns_reply *reply, gpointer arg);
static void process_dns_results (struct worker_task *task, struct suffix_item *suffix, gchar *url, guint32 addr);
static gint urls_command_handler (struct worker_task *task);
static void surbl_test_url (struct worker_task *task, void *user_data);
static void dns_callback (struct rspamd_dns_reply *reply, gpointer arg);
static void process_dns_results (struct worker_task *task,
struct suffix_item *suffix, gchar *url, guint32 addr);
#define NO_REGEXP (gpointer)-1
@ -213,7 +212,6 @@ surbl_module_init (struct config_file *cfg, struct module_ctx **ctx)
{
surbl_module_ctx = g_malloc (sizeof (struct surbl_ctx));
surbl_module_ctx->filter = surbl_filter;
surbl_module_ctx->use_redirector = 0;
surbl_module_ctx->suffixes = NULL;
surbl_module_ctx->surbl_pool = memory_pool_new (memory_pool_get_size ());
@ -237,8 +235,6 @@ surbl_module_init (struct config_file *cfg, struct module_ctx **ctx)
*ctx = (struct module_ctx *)surbl_module_ctx;
register_protocol_command ("urls", urls_command_handler);
return 0;
}
@ -449,7 +445,6 @@ surbl_module_reconfig (struct config_file *cfg)
/* Delete pool and objects */
memory_pool_delete (surbl_module_ctx->surbl_pool);
/* Reinit module */
surbl_module_ctx->filter = surbl_filter;
surbl_module_ctx->use_redirector = 0;
surbl_module_ctx->suffixes = NULL;
surbl_module_ctx->surbl_pool = memory_pool_new (memory_pool_get_size ());
@ -996,17 +991,10 @@ surbl_test_url (struct worker_task *task, void *user_data)
param.suffix = suffix;
g_tree_foreach (task->urls, surbl_tree_url_callback, &param);
}
static gint
surbl_filter (struct worker_task *task)
{
/* XXX: remove this shit */
return 0;
}
/*
* Handlers of URLS command
*/
#if 0
struct urls_tree_cb_data {
gchar *buf;
gsize len;
@ -1086,7 +1074,7 @@ urls_command_handler (struct worker_task *task)
return TRUE;
}
#endif
/*
* vi:ts=4
*/

659
src/protocol.c

@ -97,161 +97,109 @@
static GList *custom_commands = NULL;
/* XXX: remove this legacy sometimes */
static const gchar *
str_action_metric_spamc (enum rspamd_metric_action action)
{
switch (action) {
case METRIC_ACTION_REJECT:
return "reject";
case METRIC_ACTION_SOFT_REJECT:
return "soft reject";
case METRIC_ACTION_REWRITE_SUBJECT:
return "rewrite subject";
case METRIC_ACTION_ADD_HEADER:
return "add header";
case METRIC_ACTION_GREYLIST:
return "greylist";
case METRIC_ACTION_NOACTION:
return "no action";
case METRIC_ACTION_MAX:
return "invalid max action";
}
return "unknown action";
}
static inline const gchar *
rspamc_proto_str (guint ver)
/*
* Remove <> from the fixed string and copy it to the pool
*/
static gchar *
rspamd_protocol_escape_braces (GString *in)
{
gint len = 0;
gchar *orig, *p;
if (G_LIKELY (ver == 12)) {
return "1.2";
orig = in->str;
while ((g_ascii_isspace (*orig) || *orig == '<') && orig - in->str < (gint)in->len) {
orig ++;
}
else if (G_UNLIKELY (ver == 11)) {
return "1.1";
}
else if (G_UNLIKELY (ver == 13)) {
return "1.3";
}
else if (G_UNLIKELY (ver == 14)) {
return "1.4";
}
else if (G_UNLIKELY (ver == 15)) {
return "1.5";
}
else {
return "1.0";
}
}
gchar *
separate_command (f_str_t * in, gchar c)
{
guint r = 0;
gchar *p = in->begin, *b;
b = p;
g_string_erase (in, 0, orig - in->str);
while (r < in->len) {
if (*p == c) {
*p = '\0';
in->begin = p + 1;
in->len -= r + 1;
return b;
}
else if (*p == '\0') {
/* Actually we cannot allow several \0 characters in string, so write to the log about it */
msg_warn ("cannot separate command with \0 character, this can be an attack attempt");
return NULL;
}
p++;
r++;
p = orig;
while ((!g_ascii_isspace (*p) && *p != '>') && p - in->str < (gint)in->len) {
p ++;
len ++;
}
return NULL;
g_string_truncate (in, len);
return in->str;
}
static gboolean
parse_check_command (struct worker_task *task, gchar *token)
rspamd_protocol_handle_url (struct worker_task *task, struct rspamd_http_message *msg)
{
GList *cur;
struct custom_command *cmd;
const gchar *p;
if (msg->url == NULL || msg->url->len == 0) {
task->last_error = "command is absent";
task->error_code = 400;
return FALSE;
}
if (msg->url->str[0] == '/') {
p = &msg->url->str[1];
}
else {
p = msg->url->str;
}
switch (token[0]) {
switch (*p) {
case 'c':
case 'C':
/* check */
if (g_ascii_strcasecmp (token + 1, MSG_CMD_CHECK + 1) == 0) {
if (g_ascii_strcasecmp (p + 1, MSG_CMD_CHECK + 1) == 0) {
task->cmd = CMD_CHECK;
}
else {
debug_task ("bad command: %s", token);
return FALSE;
goto err;
}
break;
case 's':
case 'S':
/* symbols, skip */
if (g_ascii_strcasecmp (token + 1, MSG_CMD_SYMBOLS + 1) == 0) {
if (g_ascii_strcasecmp (p + 1, MSG_CMD_SYMBOLS + 1) == 0) {
task->cmd = CMD_SYMBOLS;
}
else if (g_ascii_strcasecmp (token + 1, MSG_CMD_SKIP + 1) == 0) {
else if (g_ascii_strcasecmp (p + 1, MSG_CMD_SKIP + 1) == 0) {
task->cmd = CMD_SKIP;
}
else {
debug_task ("bad command: %s", token);
return FALSE;
goto err;
}
break;
case 'p':
case 'P':
/* ping, process */
if (g_ascii_strcasecmp (token + 1, MSG_CMD_PING + 1) == 0) {
if (g_ascii_strcasecmp (p + 1, MSG_CMD_PING + 1) == 0) {
task->cmd = CMD_PING;
}
else if (g_ascii_strcasecmp (token + 1, MSG_CMD_PROCESS + 1) == 0) {
else if (g_ascii_strcasecmp (p + 1, MSG_CMD_PROCESS + 1) == 0) {
task->cmd = CMD_PROCESS;
}
else {
debug_task ("bad command: %s", token);
return FALSE;
goto err;
}
break;
case 'r':
case 'R':
/* report, report_ifspam */
if (g_ascii_strcasecmp (token + 1, MSG_CMD_REPORT + 1) == 0) {
if (g_ascii_strcasecmp (p + 1, MSG_CMD_REPORT + 1) == 0) {
task->cmd = CMD_REPORT;
}
else if (g_ascii_strcasecmp (token + 1, MSG_CMD_REPORT_IFSPAM + 1) == 0) {
else if (g_ascii_strcasecmp (p + 1, MSG_CMD_REPORT_IFSPAM + 1) == 0) {
task->cmd = CMD_REPORT_IFSPAM;
}
else {
debug_task ("bad command: %s", token);
return FALSE;
}
break;
case 'l':
case 'L':
if (g_ascii_strcasecmp (token + 1, MSG_CMD_LEARN + 1) == 0) {
if (task->allow_learn) {
task->cmd = CMD_LEARN;
}
else {
msg_info ("learning is disabled");
return FALSE;
}
}
else {
debug_task ("bad command: %s", token);
return FALSE;
goto err;
}
break;
default:
cur = custom_commands;
while (cur) {
cmd = cur->data;
if (g_ascii_strcasecmp (token, cmd->name) == 0) {
if (g_ascii_strcasecmp (p, cmd->name) == 0) {
task->cmd = CMD_OTHER;
task->custom_cmd = cmd;
break;
@ -260,402 +208,191 @@ parse_check_command (struct worker_task *task, gchar *token)
}
if (cur == NULL) {
debug_task ("bad command: %s", token);
return FALSE;
goto err;
}
break;
}
return TRUE;
err:
debug_task ("bad command: %s", p);
task->last_error = "invalid command";
task->error_code = 400;
return FALSE;
}
static gboolean
parse_rspamc_command (struct worker_task *task, f_str_t * line)
rspamd_protocol_handle_headers (struct worker_task *task, struct rspamd_http_message *msg)
{
gchar *token;
gchar *headern, *err, *tmp;
gboolean res = TRUE;
struct rspamd_http_header *h;
/* Separate line */
token = separate_command (line, ' ');
if (line == NULL || token == NULL) {
debug_task ("bad command");
return FALSE;
}
LL_FOREACH (msg->headers, h) {
headern = h->name->str;
if (!parse_check_command (task, token)) {
return FALSE;
}
if (g_ascii_strncasecmp (line->begin, RSPAMC_GREETING, sizeof (RSPAMC_GREETING) - 1) == 0) {
task->proto = RSPAMC_PROTO;
task->proto_ver = 10;
if (*(line->begin + sizeof (RSPAMC_GREETING) - 1) == '/') {
/* Extract protocol version */
token = line->begin + sizeof (RSPAMC_GREETING);
if (strncmp (token, RSPAMC_PROTO_1_1, sizeof (RSPAMC_PROTO_1_1) - 1) == 0) {
task->proto_ver = 11;
switch (headern[0]) {
case 'd':
case 'D':
if (g_ascii_strcasecmp (headern, DELIVER_TO_HEADER) == 0) {
task->deliver_to = rspamd_protocol_escape_braces (h->value);
debug_task ("read deliver-to header, value: %s", task->deliver_to);
}
else if (strncmp (token, RSPAMC_PROTO_1_2, sizeof (RSPAMC_PROTO_1_2) - 1) == 0) {
task->proto_ver = 12;
else {
debug_task ("wrong header: %s", headern);
res = FALSE;
}
else if (strncmp (token, RSPAMC_PROTO_1_3, sizeof (RSPAMC_PROTO_1_3) - 1) == 0) {
task->proto_ver = 13;
break;
case 'h':
case 'H':
if (g_ascii_strcasecmp (headern, HELO_HEADER) == 0) {
task->helo = h->value->str;
debug_task ("read helo header, value: %s", task->helo);
}
}
}
else if (g_ascii_strncasecmp (line->begin, SPAMC_GREETING, sizeof (SPAMC_GREETING) - 1) == 0) {
task->proto = SPAMC_PROTO;
task->proto_ver = 12;
}
else {
return FALSE;
}
task->state = READ_HEADER;
return TRUE;
}
static gboolean
parse_http_command (struct worker_task *task, f_str_t * line)
{
guint8 *p, *end, *c;
gint state = 0, next_state = 0;
gchar *cmd;
p = line->begin;
c = p;
end = p + line->len;
task->proto = RSPAMC_PROTO;
while (p < end) {
switch (state) {
case 0:
/* Expect GET or POST here */
if ((end - p > 3 &&
(*p == 'G' || *p == 'g') &&
(p[1] == 'E' || p[1] == 'e') &&
(p[2] == 'T' || p[2] == 't')) ||
(end - p > 4 &&
(*p == 'P' || *p == 'p') &&
(p[1] == 'O' || p[1] == 'o') &&
(p[2] == 'S' || p[2] == 's') &&
(p[3] == 'T' || p[3] == 't'))) {
state = 99;
next_state = 1;
p += (*p == 'g' || *p == 'G') ? 3 : 4;
else if (g_ascii_strcasecmp (headern, HOSTNAME_HEADER) == 0) {
task->hostname = h->value->str;
debug_task ("read hostname header, value: %s", task->hostname);
}
else {
msg_info ("invalid HTTP request: %V", line);
return FALSE;
debug_task ("wrong header: %s", headern);
res = FALSE;
}
break;
case 1:
/* Get command or path */
if (!g_ascii_isspace (*p)) {
p ++;
case 'f':
case 'F':
if (g_ascii_strcasecmp (headern, FROM_HEADER) == 0) {
task->from = rspamd_protocol_escape_braces (h->value);
debug_task ("read from header, value: %s", task->from);
}
else {
/* Copy command */
cmd = memory_pool_alloc (task->task_pool, p - c + 1);
rspamd_strlcpy (cmd, c, p - c + 1);
/* Skip the first '/' */
if (*cmd == '/') {
cmd ++;
}
if (!parse_check_command (task, cmd)) {
/* Assume that command is symbols */
task->cmd = CMD_SYMBOLS;
}
state = 99;
next_state = 2;
debug_task ("wrong header: %s", headern);
res = FALSE;
}
break;
case 2:
/* Get HTTP/1.0 or HTTP/1.1 */
if (p == end - 1) {
/* We are at the end */
if (g_ascii_strncasecmp (c, "HTTP/1.0", sizeof ("HTTP/1.0") - 1) == 0 ||
g_ascii_strncasecmp (c, "HTTP/1.1", sizeof ("HTTP/1.1") - 1) == 0) {
task->state = READ_HEADER;
return TRUE;
}
case 'j':
case 'J':
if (g_ascii_strcasecmp (headern, JSON_HEADER) == 0) {
task->is_json = parse_flag (h->value->str);
}
else {
p ++;
debug_task ("wrong header: %s", headern);
res = FALSE;
}
break;
case 99:
/* Skip spaces */
if (g_ascii_isspace (*p)) {
p ++;
case 'q':
case 'Q':
if (g_ascii_strcasecmp (headern, QUEUE_ID_HEADER) == 0) {
task->queue_id = h->value->str;
debug_task ("read queue_id header, value: %s", task->queue_id);
}
else {
state = next_state;
c = p;
debug_task ("wrong header: %s", headern);
res = FALSE;
}
break;
}
}
return FALSE;
}
static gboolean
parse_command (struct worker_task *task, f_str_t * line)
{
task->proto_ver = 11;
if (! task->is_http) {
return parse_rspamc_command (task, line);
}
else {
return parse_http_command (task, line);
}
/* Unreached */
return FALSE;
}
static gboolean
parse_header (struct worker_task *task, f_str_t * line)
{
gchar *headern, *err, *tmp;
gboolean res = TRUE;
/* Check end of headers */
if (line->len == 0) {
debug_task ("got empty line, assume it as end of headers");
if (task->cmd == CMD_PING || task->cmd == CMD_SKIP) {
task->state = WRITE_REPLY;
}
else {
if (task->content_length > 0) {
if (task->cmd == CMD_LEARN) {
if (task->statfile != NULL) {
rspamd_set_dispatcher_policy (task->dispatcher, BUFFER_CHARACTER, task->content_length);
task->state = READ_MESSAGE;
}
else {
task->last_error = "Unknown statfile";
task->error_code = RSPAMD_STATFILE_ERROR;
task->state = WRITE_ERROR;
return FALSE;
}
}
else {
rspamd_set_dispatcher_policy (task->dispatcher, BUFFER_CHARACTER, task->content_length);
task->state = READ_MESSAGE;
task->msg = memory_pool_alloc0 (task->task_pool, sizeof (f_str_t));
}
case 'r':
case 'R':
if (g_ascii_strcasecmp (headern, RCPT_HEADER) == 0) {
tmp = rspamd_protocol_escape_braces (h->value);
task->rcpt = g_list_prepend (task->rcpt, tmp);
debug_task ("read rcpt header, value: %s", tmp);
}
else if (task->cmd != CMD_LEARN && task->cmd != CMD_OTHER) {
rspamd_set_dispatcher_policy (task->dispatcher, BUFFER_ANY, 0);
task->state = READ_MESSAGE;
task->msg = memory_pool_alloc0 (task->task_pool, sizeof (f_str_t));
else if (g_ascii_strcasecmp (headern, NRCPT_HEADER) == 0) {
task->nrcpt = strtoul (h->value->str, &err, 10);
debug_task ("read rcpt header, value: %d", (gint)task->nrcpt);
}
else {
task->last_error = "Unknown content length";
task->error_code = RSPAMD_LENGTH_ERROR;
task->state = WRITE_ERROR;
return FALSE;
msg_info ("wrong header: %s", headern);
res = FALSE;
}
}
return TRUE;
}
headern = separate_command (line, ':');
if (line == NULL || headern == NULL) {
return FALSE;
}
/* Eat whitespaces */
g_strstrip (headern);
fstrstrip (line);
switch (headern[0]) {
case 'c':
case 'C':
/* content-length */
if (g_ascii_strncasecmp (headern, CONTENT_LENGTH_HEADER, sizeof (CONTENT_LENGTH_HEADER) - 1) == 0) {
if (task->content_length == 0) {
tmp = memory_pool_fstrdup (task->task_pool, line);
task->content_length = strtoul (tmp, &err, 10);
debug_task ("read Content-Length header, value: %ul", (guint32)task->content_length);
}
}
else {
msg_info ("wrong header: %s", headern);
res = FALSE;
}
break;
case 'd':
case 'D':
/* Deliver-To */
if (g_ascii_strncasecmp (headern, DELIVER_TO_HEADER, sizeof (DELIVER_TO_HEADER) - 1) == 0) {
task->deliver_to = escape_braces_addr_fstr (task->task_pool, line);
debug_task ("read deliver-to header, value: %s", task->deliver_to);
}
else {
msg_info ("wrong header: %s", headern);
res = FALSE;
}
break;
case 'h':
case 'H':
/* helo */
if (g_ascii_strncasecmp (headern, HELO_HEADER, sizeof (HELO_HEADER) - 1) == 0) {
task->helo = memory_pool_fstrdup (task->task_pool, line);
debug_task ("read helo header, value: %s", task->helo);
}
else if (g_ascii_strncasecmp (headern, HOSTNAME_HEADER, sizeof (HOSTNAME_HEADER) - 1) == 0) {
task->hostname = memory_pool_fstrdup (task->task_pool, line);
debug_task ("read hostname header, value: %s", task->hostname);
}
else {
msg_info ("wrong header: %s", headern);
res = FALSE;
}
break;
case 'f':
case 'F':
/* from */
if (g_ascii_strncasecmp (headern, FROM_HEADER, sizeof (FROM_HEADER) - 1) == 0) {
task->from = escape_braces_addr_fstr (task->task_pool, line);
debug_task ("read from header, value: %s", task->from);
}
else {
msg_info ("wrong header: %s", headern);
res = FALSE;
}
break;
case 'j':
case 'J':
/* json */
if (g_ascii_strncasecmp (headern, JSON_HEADER, sizeof (JSON_HEADER) - 1) == 0) {
task->is_json = parse_flag (memory_pool_fstrdup (task->task_pool, line));
}
else {
msg_info ("wrong header: %s", headern);
res = FALSE;
}
break;
case 'q':
case 'Q':
/* Queue id */
if (g_ascii_strncasecmp (headern, QUEUE_ID_HEADER, sizeof (QUEUE_ID_HEADER) - 1) == 0) {
task->queue_id = memory_pool_fstrdup (task->task_pool, line);
debug_task ("read queue_id header, value: %s", task->queue_id);
}
else {
msg_info ("wrong header: %s", headern);
res = FALSE;
}
break;
case 'r':
case 'R':
/* rcpt */
if (g_ascii_strncasecmp (headern, RCPT_HEADER, sizeof (RCPT_HEADER) - 1) == 0) {
tmp = escape_braces_addr_fstr (task->task_pool, line);
task->rcpt = g_list_prepend (task->rcpt, tmp);
debug_task ("read rcpt header, value: %s", tmp);
}
else if (g_ascii_strncasecmp (headern, NRCPT_HEADER, sizeof (NRCPT_HEADER) - 1) == 0) {
tmp = memory_pool_fstrdup (task->task_pool, line);
task->nrcpt = strtoul (tmp, &err, 10);
debug_task ("read rcpt header, value: %d", (gint)task->nrcpt);
}
else {
msg_info ("wrong header: %s", headern);
res = FALSE;
}
break;
case 'i':
case 'I':
/* ip_addr */
if (g_ascii_strncasecmp (headern, IP_ADDR_HEADER, sizeof (IP_ADDR_HEADER) - 1) == 0) {
tmp = memory_pool_fstrdup (task->task_pool, line);
break;
case 'i':
case 'I':
if (g_ascii_strcasecmp (headern, IP_ADDR_HEADER) == 0) {
tmp = h->value->str;
#ifdef HAVE_INET_PTON
if (g_ascii_strncasecmp (tmp, "IPv6:", 5) == 0) {
if (inet_pton (AF_INET6, tmp + 6, &task->from_addr.d.in6) == 1) {
task->from_addr.ipv6 = TRUE;
}
else {
msg_err ("bad ip header: '%s'", tmp);
return FALSE;
}
task->from_addr.has_addr = TRUE;
}
else {
if (inet_pton (AF_INET, tmp, &task->from_addr.d.in4) != 1) {
/* Try ipv6 */
if (inet_pton (AF_INET6, tmp, &task->from_addr.d.in6) == 1) {
if (g_ascii_strncasecmp (tmp, "IPv6:", 5) == 0) {
if (inet_pton (AF_INET6, tmp + 6, &task->from_addr.d.in6) == 1) {
task->from_addr.ipv6 = TRUE;
}
else {
msg_err ("bad ip header: '%s'", tmp);
return FALSE;
}
task->from_addr.has_addr = TRUE;
}
else {
task->from_addr.ipv6 = FALSE;
if (inet_pton (AF_INET, tmp, &task->from_addr.d.in4) != 1) {
/* Try ipv6 */
if (inet_pton (AF_INET6, tmp, &task->from_addr.d.in6) == 1) {
task->from_addr.ipv6 = TRUE;
}
else {
msg_err ("bad ip header: '%s'", tmp);
return FALSE;
}
}
else {
task->from_addr.ipv6 = FALSE;
}
task->from_addr.has_addr = TRUE;
}
task->from_addr.has_addr = TRUE;
}
#else
if (!inet_aton (tmp, &task->from_addr)) {
msg_err ("bad ip header: '%s'", tmp);
return FALSE;
}
if (!inet_aton (tmp, &task->from_addr)) {
msg_err ("bad ip header: '%s'", tmp);
return FALSE;
}
#endif
debug_task ("read IP header, value: %s", tmp);
}
else {
msg_info ("wrong header: %s", headern);
res = FALSE;
}
break;
case 'p':
case 'P':
/* Pass header */
if (g_ascii_strncasecmp (headern, PASS_HEADER, sizeof (PASS_HEADER) - 1) == 0) {
if (line->len == sizeof ("all") - 1 && g_ascii_strncasecmp (line->begin, "all", sizeof ("all") - 1) == 0) {
task->pass_all_filters = TRUE;
msg_info ("pass all filters");
}
}
else {
res = FALSE;
}
break;
case 's':
case 'S':
if (g_ascii_strncasecmp (headern, SUBJECT_HEADER, sizeof (SUBJECT_HEADER) - 1) == 0) {
task->subject = memory_pool_fstrdup (task->task_pool, line);
}
else if (g_ascii_strncasecmp (headern, STATFILE_HEADER, sizeof (STATFILE_HEADER) - 1) == 0) {
task->statfile = memory_pool_fstrdup (task->task_pool, line);
}
else {
res = FALSE;
}
break;
case 'u':
case 'U':
if (g_ascii_strncasecmp (headern, USER_HEADER, sizeof (USER_HEADER) - 1) == 0) {
task->user = memory_pool_fstrdup (task->task_pool, line);
}
else {
debug_task ("read IP header, value: %s", tmp);
}
else {
debug_task ("wrong header: %s", headern);
res = FALSE;
}
break;
case 'p':
case 'P':
if (g_ascii_strcasecmp (headern, PASS_HEADER) == 0) {
if (h->value->len == sizeof ("all") - 1 &&
g_ascii_strcasecmp (h->value->str, "all") == 0) {
task->pass_all_filters = TRUE;
debug_task ("pass all filters");
}
}
else {
res = FALSE;
}
break;
case 's':
case 'S':
if (g_ascii_strcasecmp (headern, SUBJECT_HEADER) == 0) {
task->subject = h->value->str;
}
else {
res = FALSE;
}
break;
case 'u':
case 'U':
if (g_ascii_strcasecmp (headern, USER_HEADER) == 0) {
task->user = h->value->str;
}
else {
res = FALSE;
}
break;
default:
debug_task ("wrong header: %s", headern);
res = FALSE;
break;
}
break;
default:
msg_info ("wrong header: %s", headern);
res = FALSE;
break;
}
if (!res && task->cfg->strict_protocol_headers) {
msg_err ("deny processing of a request with incorrect or unknown headers");
task->last_error = "invalid header";
task->error_code = 400;
return FALSE;
}
@ -663,18 +400,13 @@ parse_header (struct worker_task *task, f_str_t * line)
}
gboolean
read_rspamd_input_line (struct worker_task *task, f_str_t * line)
rspamd_protocol_handle_request (struct worker_task *task,
struct rspamd_http_message *msg)
{
switch (task->state) {
case READ_COMMAND:
return parse_command (task, line);
break;
case READ_HEADER:
return parse_header (task, line);
break;
default:
return FALSE;
if (rspamd_protocol_handle_url (task, msg)) {
return rspamd_protocol_handle_headers (task, msg);
}
return FALSE;
}
@ -878,7 +610,17 @@ rspamd_metric_result_ucl (struct worker_task *task, struct metric_result *mres,
if (logbuf->str[logbuf->len - 1] == ',') {
logbuf->len --;
}
rspamd_printf_gstring (logbuf, "]), ");
#ifdef HAVE_CLOCK_GETTIME
rspamd_printf_gstring (logbuf, "]), len: %z, time: %s, dns req: %d,",
task->msg->len, calculate_check_time (&task->tv, &task->ts,
task->cfg->clock_res, &task->scan_milliseconds), task->dns_requests);
#else
rspamd_printf_gstring (logbuf, "]), len: %z, time: %s, dns req: %d,",
task->msg->len,
calculate_check_time (&task->tv, task->cfg->clock_res, &task->scan_milliseconds),
task->dns_requests);
#endif
return obj;
}
@ -992,10 +734,12 @@ write_check_reply (struct rspamd_http_message *msg, struct worker_task *task)
write_hashes_to_log (task, logbuf);
msg_info ("%v", logbuf);
g_string_free (logbuf, TRUE);
msg->body = g_string_sized_new (BUFSIZ);
func.ud = msg->body;
ucl_object_emit_full (top, UCL_EMIT_JSON_COMPACT, &func);
ucl_object_unref (top);
/* Increase counters */
task->worker->srv->stat->messages_scanned++;
@ -1006,7 +750,7 @@ write_check_reply (struct rspamd_http_message *msg, struct worker_task *task)
}
gboolean
write_reply (struct worker_task *task)
rspamd_protocol_write_reply (struct worker_task *task)
{
struct rspamd_http_message *msg;
@ -1038,13 +782,6 @@ write_reply (struct worker_task *task)
"text/plain", task, task->sock, &task->tv, task->ev_base);
task->state = CLOSING_CONNECTION;
break;
case CMD_LEARN:
msg->code = task->error_code;
rspamd_http_connection_write_message (task->http_conn, msg, NULL,
"text/plain", task, task->sock, &task->tv, task->ev_base);
task->state = CLOSING_CONNECTION;
return TRUE;
break;
case CMD_OTHER:
task->state = CLOSING_CONNECTION;
return task->custom_cmd->func (task);

42
src/protocol.h

@ -8,6 +8,7 @@
#include "config.h"
#include "filter.h"
#include "http.h"
#define RSPAMD_FILTER_ERROR 1
#define RSPAMD_NETWORK_ERROR 2
@ -15,27 +16,9 @@
#define RSPAMD_LENGTH_ERROR 4
#define RSPAMD_STATFILE_ERROR 5
#define RSPAMC_PROTO_1_0 "1.0"
#define RSPAMC_PROTO_1_1 "1.1"
#define RSPAMC_PROTO_1_2 "1.2"
#define RSPAMC_PROTO_1_3 "1.3"
/*
* Reply messages
*/
#define RSPAMD_REPLY_BANNER "RSPAMD"
#define SPAMD_REPLY_BANNER "SPAMD"
#define SPAMD_OK "EX_OK"
/* XXX: try to convert rspamd errors to spamd errors */
#define SPAMD_ERROR "EX_ERROR"
struct worker_task;
struct metric;
enum rspamd_protocol {
SPAMC_PROTO,
RSPAMC_PROTO,
};
enum rspamd_command {
CMD_CHECK,
@ -45,8 +28,7 @@ enum rspamd_command {
CMD_SKIP,
CMD_PING,
CMD_PROCESS,
CMD_LEARN,
CMD_OTHER,
CMD_OTHER
};
@ -58,27 +40,19 @@ struct custom_command {
};
/**
* Find a character in command in and return pointer to the first part of the string, in is modified to point to the second part of string
* @param in f_str_t input
* @param c separator character
* @return pointer to the first part of string or NULL if there is no separator found
*/
gchar* separate_command (f_str_t * in, gchar c);
/**
* Read one line of user's input for specified task
* @param task task object
* @param line line of user's input
* @return 0 if line was successfully parsed and -1 if we have protocol error
* Process HTTP request to the task structure
* @param task
* @param msg
* @return
*/
gboolean read_rspamd_input_line (struct worker_task *task, f_str_t *line);
gboolean rspamd_protocol_handle_request (struct worker_task *task, struct rspamd_http_message *msg);
/**
* Write reply for specified task command
* @param task task object
* @return 0 if we wrote reply and -1 if there was some error
*/
gboolean write_reply (struct worker_task *task) G_GNUC_WARN_UNUSED_RESULT;
gboolean rspamd_protocol_write_reply (struct worker_task *task);
/**

23
src/util.c

@ -1409,29 +1409,6 @@ compare_url_func (gconstpointer a, gconstpointer b)
return r;
}
gchar *
escape_braces_addr_fstr (memory_pool_t *pool, f_str_t *in)
{
gint len = 0;
gchar *res, *orig, *p;
orig = in->begin;
while ((g_ascii_isspace (*orig) || *orig == '<') && orig - in->begin < (gint)in->len) {
orig ++;
}
p = orig;
while ((!g_ascii_isspace (*p) && *p != '>') && p - in->begin < (gint)in->len) {
p ++;
len ++;
}
res = memory_pool_alloc (pool, len + 1);
rspamd_strlcpy (res, orig, len + 1);
return res;
}
/*
* Find the first occurrence of find in s, ignore case.
*/

5
src/util.h

@ -218,11 +218,6 @@ gsize rspamd_strlcpy (gchar *dst, const gchar *src, gsize siz);
*/
gsize rspamd_strlcpy_tolower (gchar *dst, const gchar *src, gsize siz);
/*
* Strip <> from email address
*/
gchar * escape_braces_addr_fstr (memory_pool_t *pool, f_str_t *in);
/*
* Convert milliseconds to timeval fields
*/

288
src/worker.c

@ -89,8 +89,6 @@ struct rspamd_worker_ctx {
struct event_base *ev_base;
};
static gboolean write_socket (void *arg);
static sig_atomic_t wanna_die = 0;
#ifndef HAVE_SA_SIGINFO
@ -155,242 +153,6 @@ sigusr1_handler (gint fd, short what, void *arg)
return;
}
# if 0
/*
* Callback that is called when there is data to read in buffer
*/
static gboolean
read_socket (f_str_t * in, void *arg)
{
struct worker_task *task = (struct worker_task *) arg;
struct rspamd_worker_ctx *ctx;
ssize_t r;
GError *err = NULL;
ctx = task->worker->ctx;
switch (task->state) {
case READ_COMMAND:
case READ_HEADER:
if (!read_rspamd_input_line (task, in)) {
if (!task->last_error) {
task->last_error = "Read error";
task->error_code = RSPAMD_NETWORK_ERROR;
}
task->state = WRITE_ERROR;
}
if (task->state == WRITE_REPLY || task->state == WRITE_ERROR) {
return write_socket (task);
}
break;
case READ_MESSAGE:
/* Allow half-closed connections to be proceed */
debug_task ("got string of length %z", task->msg->len);
if (task->content_length > 0) {
task->msg->begin = in->begin;
task->msg->len = in->len;
task->state = WAIT_FILTER;
task->dispatcher->want_read = FALSE;
}
else {
task->dispatcher->want_read = FALSE;
if (in->len > 0) {
if (task->msg->begin == NULL) {
/* Allocate buf */
task->msg->size = MAX (BUFSIZ, in->len);
task->msg->begin = g_malloc (task->msg->size);
memcpy (task->msg->begin, in->begin, in->len);
task->msg->len = in->len;
}
else if (task->msg->size >= task->msg->len + in->len) {
memcpy (task->msg->begin + task->msg->len, in->begin, in->len);
task->msg->len += in->len;
}
else {
/* Need to realloc */
task->msg->size = MAX (task->msg->size * 2, task->msg->size + in->len);
task->msg->begin = g_realloc (task->msg->begin, task->msg->size);
memcpy (task->msg->begin + task->msg->len, in->begin, in->len);
task->msg->len += in->len;
}
/* Want more */
return TRUE;
}
else if (task->msg->len > 0) {
memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_free, task->msg->begin);
}
else {
msg_warn ("empty message passed");
task->last_error = "MIME processing error";
task->error_code = RSPAMD_FILTER_ERROR;
task->state = WRITE_ERROR;
return write_socket (task);
}
}
r = process_message (task);
if (r == -1) {
msg_warn ("processing of message failed");
task->last_error = "MIME processing error";
task->error_code = RSPAMD_FILTER_ERROR;
task->state = WRITE_ERROR;
return write_socket (task);
}
if (task->cmd == CMD_OTHER) {
/* Skip filters */
task->state = WRITE_REPLY;
return write_socket (task);
}
else if (task->cmd == CMD_LEARN) {
if (!learn_task (task->statfile, task, &err)) {
task->last_error = memory_pool_strdup (task->task_pool, err->message);
task->error_code = err->code;
g_error_free (err);
task->state = WRITE_ERROR;
}
else {
task->last_error = "learn ok";
task->error_code = 0;
task->state = WRITE_REPLY;
}
return write_socket (task);
}
else {
if (task->cfg->pre_filters == NULL) {
r = process_filters (task);
if (r == -1) {
task->last_error = "Filter processing error";
task->error_code = RSPAMD_FILTER_ERROR;
task->state = WRITE_ERROR;
return write_socket (task);
}
/* Add task to classify to classify pool */
if (!task->is_skipped && ctx->classify_pool) {
register_async_thread (task->s);
g_thread_pool_push (ctx->classify_pool, task, &err);
if (err != NULL) {
msg_err ("cannot pull task to the pool: %s", err->message);
remove_async_thread (task->s);
}
}
if (task->is_skipped) {
/* Call write_socket to write reply and exit */
return write_socket (task);
}
}
else {
lua_call_pre_filters (task);
/* We want fin_task after pre filters are processed */
task->s->wanna_die = TRUE;
task->state = WAIT_PRE_FILTER;
check_session_pending (task->s);
}
}
break;
case WRITE_REPLY:
case WRITE_ERROR:
return write_socket (task);
break;
case WAIT_FILTER:
case WAIT_POST_FILTER:
case WAIT_PRE_FILTER:
msg_info ("ignoring trailing garbadge of size %z", in->len);
break;
default:
debug_task ("invalid state on reading stage");
break;
}
return TRUE;
}
/*
* Callback for socket writing
*/
static gboolean
write_socket (void *arg)
{
struct worker_task *task = (struct worker_task *) arg;
struct rspamd_worker_ctx *ctx;
GError *err = NULL;
gint r;
ctx = task->worker->ctx;
switch (task->state) {
case WRITE_REPLY:
task->state = WRITING_REPLY;
if (!write_reply (task)) {
return FALSE;
}
destroy_session (task->s);
return FALSE;
break;
case WRITE_ERROR:
task->state = WRITING_REPLY;
if (!write_reply (task)) {
return FALSE;
}
destroy_session (task->s);
return FALSE;
break;
case CLOSING_CONNECTION:
debug_task ("normally closing connection");
destroy_session (task->s);
return FALSE;
break;
case WRITING_REPLY:
case WAIT_FILTER:
case WAIT_POST_FILTER:
/* Do nothing here */
break;
case WAIT_PRE_FILTER:
task->state = WAIT_FILTER;
r = process_filters (task);
if (r == -1) {
task->last_error = "Filter processing error";
task->error_code = RSPAMD_FILTER_ERROR;
task->state = WRITE_ERROR;
return write_socket (task);
}
/* Add task to classify to classify pool */
if (!task->is_skipped && ctx->classify_pool) {
register_async_thread (task->s);
g_thread_pool_push (ctx->classify_pool, task, &err);
if (err != NULL) {
msg_err ("cannot pull task to the pool: %s", err->message);
remove_async_thread (task->s);
}
}
if (task->is_skipped) {
/* Call write_socket again to write reply and exit */
return write_socket (task);
}
break;
default:
msg_info ("abnormally closing connection at state: %d", task->state);
destroy_session (task->s);
return FALSE;
break;
}
return TRUE;
}
/*
* Called if something goes wrong
*/
static void
err_socket (GError * err, void *arg)
{
struct worker_task *task = (struct worker_task *) arg;
msg_info ("abnormally closing connection from: %s, error: %s", inet_ntoa (task->client_addr), err->message);
/* Free buffers */
g_error_free (err);
destroy_session (task->s);
}
#endif
/*
* Called if all filters are processed
*/
@ -410,7 +172,7 @@ fin_task (void *arg)
task->fin_callback (task->fin_arg);
}
else {
write_reply (task);
rspamd_protocol_write_reply (task);
}
return TRUE;
}
@ -443,7 +205,7 @@ fin_task (void *arg)
task->fin_callback (task->fin_arg);
}
else {
write_reply (task);
rspamd_protocol_write_reply (task);
}
}
else {
@ -455,7 +217,7 @@ fin_task (void *arg)
task->fin_callback (task->fin_arg);
}
else {
write_reply (task);
rspamd_protocol_write_reply (task);
}
}
else {
@ -465,7 +227,7 @@ fin_task (void *arg)
task->last_error = "Filter processing error";
task->error_code = RSPAMD_FILTER_ERROR;
task->state = WRITE_ERROR;
write_reply (task);
rspamd_protocol_write_reply (task);
}
/* Add task to classify to classify pool */
if (!task->is_skipped && ctx->classify_pool) {
@ -478,7 +240,7 @@ fin_task (void *arg)
}
}
if (task->is_skipped) {
write_reply (task);
rspamd_protocol_write_reply (task);
}
}
}
@ -510,7 +272,7 @@ reduce_tasks_count (gpointer arg)
(*tasks) --;
}
static gboolean
static gint
rspamd_worker_body_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg,
const gchar *chunk, gsize len)
@ -524,7 +286,12 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
if (msg->body->len == 0) {
msg_err ("got zero length body, cannot continue");
return FALSE;
return 0;
}
if (!rspamd_protocol_handle_request (task, msg)) {
task->state = WRITE_ERROR;
return 0;
}
task->msg = msg->body;
@ -537,26 +304,12 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
task->last_error = "MIME processing error";
task->error_code = RSPAMD_FILTER_ERROR;
task->state = WRITE_ERROR;
return FALSE;
return 0;
}
if (task->cmd == CMD_OTHER) {
/* Skip filters */
task->state = WRITE_REPLY;
return FALSE;
}
else if (task->cmd == CMD_LEARN) {
if (!learn_task (task->statfile, task, &err)) {
task->last_error = memory_pool_strdup (task->task_pool, err->message);
task->error_code = err->code;
g_error_free (err);
task->state = WRITE_ERROR;
}
else {
task->last_error = "learn ok";
task->error_code = 0;
task->state = WRITE_REPLY;
}
return FALSE;
return 0;
}
else {
if (task->cfg->pre_filters == NULL) {
@ -565,7 +318,7 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
task->last_error = "Filter processing error";
task->error_code = RSPAMD_FILTER_ERROR;
task->state = WRITE_ERROR;
return FALSE;
return 0;
}
/* Add task to classify to classify pool */
if (!task->is_skipped && ctx->classify_pool) {
@ -578,7 +331,8 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
}
if (task->is_skipped) {
/* Call write_socket to write reply and exit */
return TRUE;
task->state = WRITE_REPLY;
return 0;
}
}
else {
@ -589,7 +343,7 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
check_session_pending (task->s);
}
}
return TRUE;
return 0;
}
static void
@ -611,6 +365,9 @@ rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
msg_debug ("normally closing connection from: %s", inet_ntoa (task->client_addr));
destroy_session (task->s);
}
else {
check_session_pending (task->s);
}
}
/*
@ -667,7 +424,6 @@ accept_socket (gint fd, short what, void *arg)
new_task->sock = nfd;
new_task->is_mime = ctx->is_mime;
new_task->is_json = ctx->is_json;
new_task->is_http = ctx->is_http;
new_task->allow_learn = ctx->allow_learn;
worker->srv->stat->connections_count++;
@ -779,7 +535,7 @@ start_worker (struct rspamd_worker *worker)
event_base_loop (ctx->ev_base, 0);
g_mime_shutdown ();
close_log (rspamd_main->logger);
exit (EXIT_SUCCESS);
}

Loading…
Cancel
Save