Browse Source

[Rework] Eliminate maps locking

pull/5512/head
Vsevolod Stakhov 5 months ago
parent
commit
529fb2025f
No known key found for this signature in database GPG Key ID: 7647B6790081437
  1. 88
      src/libserver/maps/map.c
  2. 2
      src/libserver/maps/map_private.h

88
src/libserver/maps/map.c

@ -109,7 +109,8 @@ write_http_request(struct http_callback_data *cbd)
}
if (cbd->data->etag) {
rspamd_http_message_add_header_len(msg, "If-None-Match",
cbd->data->etag->str, cbd->data->etag->len);
cbd->data->etag->str,
cbd->data->etag->len);
}
}
@ -295,21 +296,6 @@ rspamd_map_cache_cb(struct ev_loop *loop, ev_timer *w, int revents)
}
}
/*
* Unlocks the current backend if locked before switching to another backend
*/
static void
rspamd_map_unlock_current_backend(struct map_periodic_cbdata *cbd)
{
struct rspamd_map *map = cbd->map;
if (cbd->owned_lock && cbd->cur_backend < cbd->map->backends->len) {
g_atomic_int_set(&map->shared->locked, 0);
cbd->owned_lock = FALSE;
msg_debug_map("unlocked map %s before switching", map->name);
}
}
static inline time_t
rspamd_http_map_process_next_check(time_t now, time_t expires, time_t map_check_interval)
{
@ -348,10 +334,10 @@ http_map_finish(struct rspamd_http_connection *conn,
if (msg->code == 200) {
if (cbd->check) {
msg_info_map("need to reread map from %s", cbd->bk->uri);
msg_info_map("need to reread map from %s (reply code 200); "
"date timestamp: %z, last modified: %z",
cbd->bk->uri, (size_t) msg->date, (size_t) msg->last_modified);
cbd->periodic->need_modify = TRUE;
/* Unlock current backend before resetting */
rspamd_map_unlock_current_backend(cbd->periodic);
/* Reset the whole chain */
cbd->periodic->cur_backend = 0;
/* Reset cache, old cached data will be cleaned on timeout */
@ -374,7 +360,6 @@ http_map_finish(struct rspamd_http_connection *conn,
cbd->data->last_modified = msg->date;
}
/* Unsigned version - just open file */
cbd->shmem_data = rspamd_http_message_shmem_ref(msg);
cbd->data_len = msg->body_buf.len;
@ -543,8 +528,6 @@ http_map_finish(struct rspamd_http_connection *conn,
MAP_RELEASE(cbd->shmem_data, "shmem_data");
/* Unlock current backend before switching to next */
rspamd_map_unlock_current_backend(cbd->periodic);
cbd->periodic->cur_backend++;
munmap(in, dlen);
rspamd_map_process_periodic(cbd->periodic);
@ -589,21 +572,23 @@ http_map_finish(struct rspamd_http_connection *conn,
if (map->next_check) {
rspamd_http_date_format(next_check_date, sizeof(next_check_date),
map->next_check);
msg_info_map("data is not modified for server %s, next check at %s "
msg_info_map("data is not modified for server %s (%s), next check at %s "
"(http cache based: %T)",
cbd->data->host, next_check_date, expires_hdr);
cbd->data->host,
bk->uri,
next_check_date, expires_hdr);
}
else {
rspamd_http_date_format(next_check_date, sizeof(next_check_date),
rspamd_get_calendar_ticks() + map->poll_timeout);
msg_info_map("data is not modified for server %s, next check at %s "
msg_info_map("data is not modified for server %s (%s), next check at %s "
"(timer based)",
cbd->data->host, next_check_date);
cbd->data->host,
bk->uri,
next_check_date);
}
rspamd_map_update_http_cached_file(map, bk, cbd->data);
/* Unlock current backend before switching to next */
rspamd_map_unlock_current_backend(cbd->periodic);
cbd->periodic->cur_backend++;
rspamd_map_process_periodic(cbd->periodic);
}
@ -1052,20 +1037,13 @@ rspamd_map_periodic_dtor(struct map_periodic_cbdata *periodic)
/* Not modified */
}
if (periodic->owned_lock) {
if (periodic->cur_backend < map->backends->len) {
g_atomic_int_set(&map->shared->locked, 0);
msg_debug_map("unlocked map %s", map->name);
}
if (periodic->map->wrk->state == rspamd_worker_state_running) {
rspamd_map_schedule_periodic(periodic->map,
RSPAMD_SYMBOL_RESULT_NORMAL);
}
else {
msg_debug_map("stop scheduling periodics for %s; terminating state",
periodic->map->name);
}
if (periodic->map->wrk->state == rspamd_worker_state_running) {
rspamd_map_schedule_periodic(periodic->map,
RSPAMD_MAP_SCHEDULE_NORMAL);
}
else {
msg_debug_map("stop scheduling periodics for %s; terminating state",
periodic->map->name);
}
g_free(periodic);
@ -1801,8 +1779,6 @@ rspamd_map_common_http_callback(struct rspamd_map *map,
(int) data->last_modified,
(int) data->cache->last_modified);
periodic->need_modify = TRUE;
/* Reset the whole chain */
g_atomic_int_set(&map->shared->locked, 0);
periodic->cur_backend = 0;
rspamd_map_process_periodic(periodic);
}
@ -1813,7 +1789,6 @@ rspamd_map_common_http_callback(struct rspamd_map *map,
}
else {
/* Switch to the next backend */
g_atomic_int_set(&map->shared->locked, 0);
periodic->cur_backend++;
rspamd_map_process_periodic(periodic);
}
@ -2073,33 +2048,10 @@ rspamd_map_process_periodic(struct map_periodic_cbdata *cbd)
bk = g_ptr_array_index(map->backends, cbd->cur_backend);
if (!map->file_only && !cbd->owned_lock) {
if (!g_atomic_int_compare_and_exchange(&map->shared->locked,
0, 1)) {
msg_debug_map(
"don't try to reread map %s as it is locked by other process, "
"will reread it later",
cbd->map->name);
rspamd_map_schedule_periodic(map, RSPAMD_MAP_SCHEDULE_LOCKED);
MAP_RELEASE(cbd, "periodic");
return;
}
else {
msg_debug_map("locked map %s", map->name);
cbd->owned_lock = TRUE;
}
}
if (cbd->errored) {
/* We should not check other backends if some backend has failed*/
rspamd_map_schedule_periodic(cbd->map, RSPAMD_MAP_SCHEDULE_ERROR);
if (cbd->owned_lock) {
g_atomic_int_set(&map->shared->locked, 0);
cbd->owned_lock = FALSE;
}
/* Also set error flag for the map consumer */
cbd->cbdata.errored = true;

2
src/libserver/maps/map_private.h

@ -155,7 +155,6 @@ struct map_periodic_cbdata;
* Shared between workers
*/
struct rspamd_map_shared_data {
int locked;
int loaded;
int cached;
};
@ -212,7 +211,6 @@ struct map_periodic_cbdata {
ev_timer ev;
gboolean need_modify;
gboolean errored;
gboolean owned_lock;
unsigned int cur_backend;
ref_entry_t ref;
};

Loading…
Cancel
Save