Browse Source

* Fix redirector connection procedure

* Add more strict login
* Add new header Queue-ID to protocol
* Log message id or queue id
* Add config file for redirector
* Add ability to set regexp and domains list to check with redirector
rspamd-0.5
Vsevolod Stakhov 17 years ago
parent
commit
e7017a518b
  1. 1
      src/main.h
  2. 5
      src/message.c
  3. 24
      src/plugins/surbl.c
  4. 13
      src/protocol.c
  5. 9
      src/worker.c
  6. 49
      utils/redirector.pl.in

1
src/main.h

@ -174,6 +174,7 @@ struct worker_task {
int sock; /**< socket descriptor */
char *helo; /**< helo header value */
char *from; /**< from header value */
char *queue_id; /**< queue id if specified */
GList *rcpt; /**< recipients list */
unsigned int nrcpt; /**< number of recipients */
struct in_addr from_addr; /**< client addr in numeric form */

5
src/message.c

@ -362,7 +362,10 @@ process_message (struct worker_task *task)
g_mime_message_foreach_part (message, mime_foreach_callback, task);
#endif
msg_info ("process_message: found %d parts in message", task->parts_count);
msg_debug ("process_message: found %d parts in message", task->parts_count);
if (task->queue_id == NULL) {
task->queue_id = (char *)g_mime_message_get_message_id (task->message);
}
task->worker->srv->stat->messages_scanned ++;

24
src/plugins/surbl.c

@ -386,11 +386,13 @@ dns_callback (int result, char type, int count, int ttl, void *addresses, void *
*(param->url->host + param->url->hostlen) = 0;
/* If we have result from DNS server, this url exists in SURBL, so increase score */
if (result == DNS_ERR_NONE && type == DNS_IPv4_A) {
msg_info ("surbl_check: url %s is in surbl %s", param->url->host, param->suffix->suffix);
msg_info ("surbl_check: <%s> url %s is in surbl %s",
param->task->queue_id, param->url->host, param->suffix->suffix);
process_dns_results (param->task, param->suffix, param->url->host, (uint32_t)(((in_addr_t *)addresses)[0]));
}
else {
msg_debug ("surbl_check: url %s is not in surbl %s", param->url->host, param->suffix->suffix);
msg_debug ("surbl_check: <%s> url %s is not in surbl %s",
param->task->queue_id, param->url->host, param->suffix->suffix);
}
*(param->url->host + param->url->hostlen) = c;
@ -553,7 +555,8 @@ redirector_callback (int fd, short what, void *arg)
}
else {
event_del (&param->ev);
msg_info ("redirector_callback: connection to redirector timed out while waiting for write");
msg_info ("redirector_callback: <%s> connection to redirector timed out while waiting for write",
param->task->queue_id);
param->task->save.saved --;
make_surbl_requests (param->url, param->task, param->tree);
@ -577,7 +580,8 @@ redirector_callback (int fd, short what, void *arg)
}
}
if (*p == '\0') {
msg_info ("redirector_callback: got reply from redirector: '%s' -> '%s'", struri (param->url), c);
msg_info ("redirector_callback: <%s> got reply from redirector: '%s' -> '%s'",
param->task->queue_id, struri (param->url), c);
parse_uri (param->url, c, param->task->task_pool);
}
}
@ -592,7 +596,8 @@ redirector_callback (int fd, short what, void *arg)
}
else {
event_del (&param->ev);
msg_info ("redirector_callback: reading redirector timed out, while waiting for read");
msg_info ("redirector_callback: <%s> reading redirector timed out, while waiting for read",
param->task->queue_id);
param->task->save.saved --;
make_surbl_requests (param->url, param->task, param->tree);
if (param->task->save.saved == 0) {
@ -613,10 +618,11 @@ register_redirector_call (struct uri *url, struct worker_task *task, GTree *url_
struct redirector_param *param;
struct timeval timeout;
s = make_tcp_socket (&surbl_module_ctx->redirector_addr, htons (surbl_module_ctx->redirector_port), FALSE);
s = make_tcp_socket (&surbl_module_ctx->redirector_addr, surbl_module_ctx->redirector_port, FALSE);
if (s == -1) {
msg_info ("register_redirector_call: cannot create tcp socket failed: %s", strerror (errno));
msg_info ("register_redirector_call: <%s> cannot create tcp socket failed: %s",
task->queue_id, strerror (errno));
task->save.saved --;
make_surbl_requests (url, task, url_tree);
return;
@ -641,7 +647,7 @@ surbl_test_url (struct worker_task *task)
struct memcached_param *param;
GTree *url_tree;
url_tree = g_tree_new ((GCompareFunc)g_strcasecmp);
url_tree = g_tree_new ((GCompareFunc)g_ascii_strcasecmp);
TAILQ_FOREACH (url, &task->urls, next) {
msg_debug ("surbl_test_url: check url %s", struri (url));
@ -660,7 +666,7 @@ surbl_test_url (struct worker_task *task)
}
}
g_tree_destroy (url_tree);
memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_tree_destroy, url_tree);
return 0;
}

13
src/protocol.c

@ -77,6 +77,7 @@
#define IP_ADDR_HEADER "IP"
#define NRCPT_HEADER "Recipient-Number"
#define RCPT_HEADER "Rcpt"
#define QUEUE_ID_HEADER "Queue-ID"
#define ERROR_HEADER "Error"
/*
* Reply messages
@ -247,6 +248,18 @@ parse_header (struct worker_task *task, char *line)
return -1;
}
break;
case 'q':
case 'Q':
/* Queue id */
if (strncasecmp (headern, QUEUE_ID_HEADER, sizeof (QUEUE_ID_HEADER) - 1) == 0) {
task->queue_id = memory_pool_strdup (task->task_pool, line);
msg_debug ("parse_header: read queue_id header, value: %s", task->queue_id);
}
else {
msg_info ("parse_header: wrong header: %s", headern);
return -1;
}
break;
case 'r':
case 'R':
/* rcpt */

9
src/worker.c

@ -220,6 +220,7 @@ accept_socket (int fd, short what, void *arg)
{
struct rspamd_worker *worker = (struct rspamd_worker *)arg;
struct sockaddr_storage ss;
struct sockaddr_in *sin;
struct worker_task *new_task;
socklen_t addrlen = sizeof(ss);
int nfd;
@ -228,6 +229,14 @@ accept_socket (int fd, short what, void *arg)
msg_warn ("accept_socket: accept failed: %s", strerror (errno));
return;
}
if (ss.ss_family == AF_UNIX) {
msg_info ("accept_socket: accepted connection from unix socket");
}
else if (ss.ss_family == AF_INET) {
sin = (struct sockaddr_in *) &ss;
msg_info ("accept_socket: accepted connection from %s port %d", inet_ntoa (sin->sin_addr), ntohs (sin->sin_port));
}
new_task = g_malloc (sizeof (struct worker_task));

49
utils/redirector.pl.in

@ -42,10 +42,13 @@ my %cfg = (
logfile => '/var/log/rspamd-redirector.log',
do_log => 0,
debug => 0,
check_regexp => 'http://[^/]+/',
check_domains => [ 'narod.ru', 'test.ru' ],
digest_bits => 256,
cache_expire => 3600,
user => '@RSPAMD_USER@',
group => '@RSPAMD_GROUP@',
cfg_file => '@CMAKE_INSTALL_PREFIX@/etc/rspamd-redirector.conf',
);
our $do_reopen_log = 0;
@ -59,8 +62,8 @@ Proc::Daemon::Init if !$cfg{debug};
if ($> == 0) {
my $uid = getpwnam($cfg{user}) or die "user $cfg{user} unknown";
my $gid = getgrnam($cfg{group}) or die "group $cfg{group} unknown";
$< = $uid;
$) = $gid;
$< = $> = $uid;
$) = $( = $gid;
}
die "Cannot write to pidfile $cfg{pidfile}" if ! open(PID, "> $cfg{pidfile}");
@ -88,6 +91,7 @@ my $memd = new Cache::Memcached::Fast({
$SIG{USR1} = sub { $do_reopen_log = 1; };
$SIG{INT} = sub { $poe_kernel->stop(); };
$SIG{QUIT} = sub { $poe_kernel->stop(); };
$SIG{PIPE} = 'IGNORE';
write_log ("", "Starting URL resolver");
@ -104,6 +108,17 @@ POE::Component::Client::HTTP->spawn(
),
);
sub read_file {
my ($file) = @_;
open(IN, $file) or die "Can't open $file: $!";
local $/;
my $content = <IN>;
close IN;
return $content;
}
sub reopen_log {
if ($cfg{do_log}) {
close (LOG);
@ -403,6 +418,30 @@ sub process_input {
return;
}
my $domain;
if ($request->uri =~ /^http:\/\/([^\/]+)\//) {
my @parts = split(/\./, $1);
my $c1 = pop @parts;
my $c2 = pop @parts;
$domain = "$c2.$c1";
}
if ((defined($cfg{check_regexp}) && $request->uri !~ $cfg{check_regexp}) ||
(defined($cfg{check_domains}) && ($_ = grep(/$domain/, $cfg{check_domains})) == 0)) {
write_log ($heap->{remote_ip}, "Uri is not checked: " . $request->uri);
my $new_response = HTTP::Response->new(200);
$new_response->header("Uri", $request->uri);
$new_response->header("Connection", "close");
$new_response->header("Proxy-Connection", "close");
# Avoid sending the response if the client has gone away.
$heap->{client}->put($new_response) if defined $heap->{client};
$kernel->yield("shutdown");
# Shut down the client's connection when the response is sent.
return;
}
# Check cache first
my $redirect = memcached_check_url($request->uri);
if ($redirect) {
@ -426,6 +465,12 @@ sub process_input {
$kernel->post( "cl", "request", "got_response", $new_request, [0, ""]);
}
# Try to eval config file
if (-f $cfg{cfg_file}) {
my $config = read_file ($cfg{cfg_file});
eval $config;
}
POE::Component::Server::TCP->new
( Alias => "",
Port => $cfg{port},

Loading…
Cancel
Save