Browse Source

Implement listening for srv pipe in the main process

pull/444/head
Vsevolod Stakhov 10 years ago
parent
commit
e0b4ba6307
  1. 1
      src/libserver/rspamd_control.c
  2. 49
      src/libserver/worker_util.c
  3. 2
      src/libserver/worker_util.h
  4. 44
      src/rspamd.c

1
src/libserver/rspamd_control.c

@ -532,6 +532,7 @@ rspamd_srv_handler (gint fd, short what, gpointer ud)
spair = g_hash_table_lookup (srv->spairs, cmd.cmd.spair.pair_id);
if (spair == NULL) {
spair = g_malloc (sizeof (gint) * 2);
if (rspamd_socketpair (spair) == -1) {
rdata->rep.reply.spair.code = errno;
msg_err ("cannot create socket pair: %s", strerror (errno));

49
src/libserver/worker_util.c

@ -436,33 +436,34 @@ rspamd_worker_set_limits (struct rspamd_main *rspamd_main,
struct rspamd_worker *
rspamd_fork_worker (struct rspamd_main *rspamd_main,
struct rspamd_worker_conf *cf,
guint index)
guint index,
struct event_base *ev_base)
{
struct rspamd_worker *cur;
struct rspamd_worker *wrk;
gint rc;
/* Starting worker process */
cur = (struct rspamd_worker *) g_malloc0 (sizeof (struct rspamd_worker));
wrk = (struct rspamd_worker *) g_malloc0 (sizeof (struct rspamd_worker));
if (!rspamd_socketpair (cur->control_pipe)) {
if (!rspamd_socketpair (wrk->control_pipe)) {
msg_err ("socketpair failure: %s", strerror (errno));
exit (-errno);
}
if (!rspamd_socketpair (cur->srv_pipe)) {
if (!rspamd_socketpair (wrk->srv_pipe)) {
msg_err ("socketpair failure: %s", strerror (errno));
exit (-errno);
}
cur->srv = rspamd_main;
cur->type = cf->type;
cur->cf = g_malloc (sizeof (struct rspamd_worker_conf));
memcpy (cur->cf, cf, sizeof (struct rspamd_worker_conf));
cur->index = index;
cur->ctx = cf->ctx;
wrk->srv = rspamd_main;
wrk->type = cf->type;
wrk->cf = g_malloc (sizeof (struct rspamd_worker_conf));
memcpy (wrk->cf, cf, sizeof (struct rspamd_worker_conf));
wrk->index = index;
wrk->ctx = cf->ctx;
cur->pid = fork ();
wrk->pid = fork ();
switch (cur->pid) {
switch (wrk->pid) {
case 0:
/* Update pid for logging */
rspamd_log_update_pid (cf->type, rspamd_main->logger);
@ -484,7 +485,7 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
/* Do silent log reopen to avoid collisions */
rspamd_log_close (rspamd_main->logger);
rspamd_log_open (rspamd_main->logger);
cur->start_time = rspamd_get_calendar_ticks ();
wrk->start_time = rspamd_get_calendar_ticks ();
#if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION <= 30))
# if (GLIB_MINOR_VERSION > 20)
@ -498,10 +499,13 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
#endif
msg_info_main ("starting %s process %P", cf->worker->name, getpid ());
/* Close parent part of socketpair */
close (cur->control_pipe[0]);
rspamd_socket_nonblocking (cur->control_pipe[1]);
close (wrk->control_pipe[0]);
close (wrk->srv_pipe[0]);
rspamd_socket_nonblocking (wrk->control_pipe[1]);
rspamd_socket_nonblocking (wrk->srv_pipe[1]);
/* Execute worker */
cf->worker->worker_start_func (cur);
cf->worker->worker_start_func (wrk);
exit (EXIT_FAILURE);
break;
case -1:
msg_err_main ("cannot fork main process. %s", strerror (errno));
@ -510,15 +514,18 @@ rspamd_fork_worker (struct rspamd_main *rspamd_main,
break;
default:
/* Close worker part of socketpair */
close (cur->control_pipe[1]);
rspamd_socket_nonblocking (cur->control_pipe[0]);
close (wrk->control_pipe[1]);
close (wrk->srv_pipe[1]);
rspamd_socket_nonblocking (wrk->control_pipe[0]);
rspamd_socket_nonblocking (wrk->srv_pipe[0]);
rspamd_srv_start_watching (wrk, ev_base);
/* Insert worker into worker's table, pid is index */
g_hash_table_insert (rspamd_main->workers, GSIZE_TO_POINTER (
cur->pid), cur);
wrk->pid), wrk);
break;
}
return cur;
return wrk;
}
void

2
src/libserver/worker_util.h

@ -133,7 +133,7 @@ void rspamd_worker_block_signals (void);
* Fork new worker with the specified configuration
*/
struct rspamd_worker *rspamd_fork_worker (struct rspamd_main *,
struct rspamd_worker_conf *, guint);
struct rspamd_worker_conf *, guint idx, struct event_base *ev_base);
#define msg_err_main(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
rspamd_main->server_pool->tag.tagname, rspamd_main->server_pool->tag.uid, \

44
src/rspamd.c

@ -313,7 +313,8 @@ rspamd_fork_delayed_cb (gint signo, short what, gpointer arg)
struct waiting_worker *w = arg;
event_del (&w->wait_ev);
rspamd_fork_worker (w->rspamd_main, w->cf, w->oldindex);
rspamd_fork_worker (w->rspamd_main, w->cf, w->oldindex,
w->rspamd_main->ev_base);
g_slice_free1 (sizeof (*w), w);
}
@ -431,7 +432,7 @@ make_listen_key (struct rspamd_worker_bind_conf *cf)
}
static void
spawn_workers (struct rspamd_main *rspamd_main)
spawn_workers (struct rspamd_main *rspamd_main, struct event_base *ev_base)
{
GList *cur, *ls;
struct rspamd_worker_conf *cf;
@ -493,14 +494,14 @@ spawn_workers (struct rspamd_main *rspamd_main)
msg_warn_main ("cannot spawn more than 1 %s worker, so spawn one",
cf->worker->name);
}
rspamd_fork_worker (rspamd_main, cf, 0);
rspamd_fork_worker (rspamd_main, cf, 0, ev_base);
}
else if (cf->worker->threaded) {
rspamd_fork_worker (rspamd_main, cf, 0);
rspamd_fork_worker (rspamd_main, cf, 0, ev_base);
}
else {
for (i = 0; i < cf->count; i++) {
rspamd_fork_worker (rspamd_main, cf, i);
rspamd_fork_worker (rspamd_main, cf, i, ev_base);
}
}
}
@ -553,6 +554,7 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused)
msg_info_main ("%s process %P terminated %s", g_quark_to_string (
w->type), w->pid,
WTERMSIG (res) == SIGKILL ? "hardly" : "softly");
event_del (&w->srv_ev);
g_free (w->cf);
g_free (w);
@ -664,7 +666,7 @@ rspamd_hup_handler (gint signo, short what, gpointer arg)
g_hash_table_foreach (rspamd_main->workers, kill_old_workers, NULL);
rspamd_map_remove_all (rspamd_main->cfg);
reread_config (rspamd_main);
spawn_workers (rspamd_main);
spawn_workers (rspamd_main, rspamd_main->ev_base);
}
static void
@ -731,6 +733,8 @@ rspamd_cld_handler (gint signo, short what, gpointer arg)
rspamd_fork_delayed (cur->cf, cur->index, rspamd_main);
}
event_del (&cur->srv_ev);
g_free (cur->cf);
g_free (cur);
}
else {
@ -783,6 +787,28 @@ rspamd_control_handler (gint fd, short what, gpointer arg)
rspamd_control_process_client_socket (rspamd_main, nfd);
}
static guint
rspamd_spair_hash (gconstpointer p)
{
return XXH64 (p, PAIR_ID_LEN, rspamd_hash_seed ());
}
static gboolean
rspamd_spair_equal (gconstpointer a, gconstpointer b)
{
return memcmp (a, b, PAIR_ID_LEN) == 0;
}
static void
rspamd_spair_close (gpointer p)
{
gint *fds = p;
close (fds[0]);
close (fds[1]);
g_free (p);
}
gint
main (gint argc, gchar **argv, gchar **env)
{
@ -806,6 +832,8 @@ main (gint argc, gchar **argv, gchar **env)
rspamd_main->stat = rspamd_mempool_alloc0_shared (rspamd_main->server_pool,
sizeof (struct rspamd_stat));
rspamd_main->cfg = rspamd_config_new ();
rspamd_main->spairs = g_hash_table_new_full (rspamd_spair_hash,
rspamd_spair_equal, g_free, rspamd_spair_close);
#ifndef HAVE_SETPROCTITLE
init_title (argc, argv, env);
@ -1009,7 +1037,6 @@ main (gint argc, gchar **argv, gchar **env)
#endif
/* Spawn workers */
rspamd_main->workers = g_hash_table_new (g_direct_hash, g_direct_equal);
spawn_workers (rspamd_main);
/* Init event base */
ev_base = event_init ();
@ -1035,6 +1062,8 @@ main (gint argc, gchar **argv, gchar **env)
event_base_set (ev_base, &usr1_ev);
event_add (&usr1_ev, NULL);
spawn_workers (rspamd_main, ev_base);
if (control_fd != -1) {
msg_info_main ("listening for control commands on %s",
rspamd_inet_address_to_string (control_addr));
@ -1073,6 +1102,7 @@ main (gint argc, gchar **argv, gchar **env)
/* Wait for workers termination */
g_hash_table_foreach_remove (rspamd_main->workers, wait_for_workers, NULL);
g_hash_table_unref (rspamd_main->spairs);
event_set (&term_ev, -1, EV_TIMEOUT|EV_PERSIST,
rspamd_final_term_handler, rspamd_main);

Loading…
Cancel
Save