|
|
|
@ -177,23 +177,98 @@ sendfile_callback (rspamd_io_dispatcher_t *d) |
|
|
|
static gboolean |
|
|
|
write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed) |
|
|
|
{ |
|
|
|
GList *cur; |
|
|
|
GList *cur = NULL, *tmp; |
|
|
|
GError *err = NULL; |
|
|
|
rspamd_buffer_t *buf; |
|
|
|
ssize_t r; |
|
|
|
|
|
|
|
/* Fix order */ |
|
|
|
if (d->out_buffers) { |
|
|
|
d->out_buffers = g_list_reverse (d->out_buffers); |
|
|
|
} |
|
|
|
cur = g_list_first (d->out_buffers); |
|
|
|
while (cur) { |
|
|
|
buf = (rspamd_buffer_t *) cur->data; |
|
|
|
if (BUFREMAIN (buf) == 0) { |
|
|
|
/* Skip empty buffers */ |
|
|
|
cur = g_list_next (cur); |
|
|
|
continue; |
|
|
|
struct iovec *iov; |
|
|
|
guint i, len, blen; |
|
|
|
|
|
|
|
len = g_queue_get_length (d->out_buffers); |
|
|
|
if (len > 1) { |
|
|
|
/* IOV version */ |
|
|
|
cur = d->out_buffers->tail; |
|
|
|
iov = g_slice_alloc (len * sizeof (struct iovec)); |
|
|
|
i = 0; |
|
|
|
while (cur) { |
|
|
|
buf = cur->data; |
|
|
|
blen = BUFREMAIN (buf); |
|
|
|
if (blen > 0) { |
|
|
|
iov[i].iov_base = buf->pos; |
|
|
|
iov[i].iov_len = blen; |
|
|
|
} |
|
|
|
else { |
|
|
|
iov[i].iov_base = NULL; |
|
|
|
iov[i].iov_len = 0; |
|
|
|
} |
|
|
|
i ++; |
|
|
|
cur = g_list_previous (cur); |
|
|
|
} |
|
|
|
/* Now try to write the whole vector */ |
|
|
|
r = writev (fd, iov, len); |
|
|
|
if (r == -1 && errno != EAGAIN) { |
|
|
|
g_slice_free1 (len * sizeof (struct iovec), iov); |
|
|
|
if (d->err_callback) { |
|
|
|
err = g_error_new (G_DISPATCHER_ERROR, errno, "%s", strerror (errno)); |
|
|
|
d->err_callback (err, d->user_data); |
|
|
|
return FALSE; |
|
|
|
} |
|
|
|
} |
|
|
|
else if (r > 0) { |
|
|
|
/* Find pos inside buffers */ |
|
|
|
cur = d->out_buffers->tail; |
|
|
|
i = 0; |
|
|
|
while (cur) { |
|
|
|
buf = cur->data; |
|
|
|
blen = BUFREMAIN (buf); |
|
|
|
if (r >= blen) { |
|
|
|
tmp = cur; |
|
|
|
cur = g_list_previous (cur); |
|
|
|
/* Mark this buffer as read */ |
|
|
|
g_queue_delete_link (d->out_buffers, tmp); |
|
|
|
r -= blen; |
|
|
|
} |
|
|
|
else { |
|
|
|
/* This buffer was not written completely */ |
|
|
|
buf->pos += r; |
|
|
|
break; |
|
|
|
} |
|
|
|
i ++; |
|
|
|
cur = g_list_previous (cur); |
|
|
|
} |
|
|
|
g_slice_free1 (len * sizeof (struct iovec), iov); |
|
|
|
if (cur != 0) { |
|
|
|
/* Wait for other event */ |
|
|
|
event_del (d->ev); |
|
|
|
event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d); |
|
|
|
event_base_set (d->ev_base, d->ev); |
|
|
|
event_add (d->ev, d->tv); |
|
|
|
return TRUE; |
|
|
|
} |
|
|
|
} |
|
|
|
else if (r == 0) { |
|
|
|
/* Got EOF while we wait for data */ |
|
|
|
g_slice_free1 (len * sizeof (struct iovec), iov); |
|
|
|
if (d->err_callback) { |
|
|
|
err = g_error_new (G_DISPATCHER_ERROR, EOF, "got EOF"); |
|
|
|
d->err_callback (err, d->user_data); |
|
|
|
return FALSE; |
|
|
|
} |
|
|
|
} |
|
|
|
else if (r == -1 && errno == EAGAIN) { |
|
|
|
g_slice_free1 (len * sizeof (struct iovec), iov); |
|
|
|
debug_ip("partially write data, retry"); |
|
|
|
/* Wait for other event */ |
|
|
|
event_del (d->ev); |
|
|
|
event_set (d->ev, fd, EV_WRITE, dispatcher_cb, (void *)d); |
|
|
|
event_base_set (d->ev_base, d->ev); |
|
|
|
event_add (d->ev, d->tv); |
|
|
|
return TRUE; |
|
|
|
} |
|
|
|
} |
|
|
|
else if (len == 1) { |
|
|
|
/* Single write version */ |
|
|
|
buf = d->out_buffers->head->data; |
|
|
|
r = write (fd, buf->pos, BUFREMAIN (buf)); |
|
|
|
if (r == -1 && errno != EAGAIN) { |
|
|
|
if (d->err_callback) { |
|
|
|
@ -207,7 +282,7 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed) |
|
|
|
if (BUFREMAIN (buf) != 0) { |
|
|
|
/* Continue with this buffer */ |
|
|
|
debug_ip("wrote %z bytes of %z", r, buf->data->len); |
|
|
|
continue; |
|
|
|
return write_buffers (fd, d, is_delayed); |
|
|
|
} |
|
|
|
} |
|
|
|
else if (r == 0) { |
|
|
|
@ -227,13 +302,11 @@ write_buffers (gint fd, rspamd_io_dispatcher_t * d, gboolean is_delayed) |
|
|
|
event_add (d->ev, d->tv); |
|
|
|
return TRUE; |
|
|
|
} |
|
|
|
cur = g_list_next (cur); |
|
|
|
} |
|
|
|
|
|
|
|
if (cur == NULL) { |
|
|
|
/* Disable write event for this time */ |
|
|
|
g_list_free (d->out_buffers); |
|
|
|
d->out_buffers = NULL; |
|
|
|
g_queue_clear (d->out_buffers); |
|
|
|
|
|
|
|
debug_ip("all buffers were written successfully"); |
|
|
|
|
|
|
|
@ -472,7 +545,7 @@ dispatcher_cb (gint fd, short what, void *arg) |
|
|
|
sendfile_callback (d); |
|
|
|
} |
|
|
|
else { |
|
|
|
if (d->out_buffers == NULL) { |
|
|
|
if (g_queue_get_length (d->out_buffers) == 0) { |
|
|
|
event_del (d->ev); |
|
|
|
event_set (d->ev, fd, EV_READ | EV_PERSIST, dispatcher_cb, (void *)d); |
|
|
|
event_base_set (d->ev_base, d->ev); |
|
|
|
@ -526,6 +599,7 @@ rspamd_create_dispatcher (struct event_base *base, gint fd, enum io_policy polic |
|
|
|
new->ev = memory_pool_alloc0 (new->pool, sizeof (struct event)); |
|
|
|
new->fd = fd; |
|
|
|
new->ev_base = base; |
|
|
|
new->out_buffers = g_queue_new (); |
|
|
|
|
|
|
|
event_set (new->ev, fd, EV_WRITE, dispatcher_cb, (void *)new); |
|
|
|
event_base_set (new->ev_base, new->ev); |
|
|
|
@ -540,9 +614,7 @@ rspamd_remove_dispatcher (rspamd_io_dispatcher_t * dispatcher) |
|
|
|
if (dispatcher != NULL) { |
|
|
|
event_del (dispatcher->ev); |
|
|
|
memory_pool_delete (dispatcher->pool); |
|
|
|
if (dispatcher->out_buffers) { |
|
|
|
g_list_free (dispatcher->out_buffers); |
|
|
|
} |
|
|
|
g_queue_free (dispatcher->out_buffers); |
|
|
|
g_free (dispatcher); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -609,7 +681,7 @@ rspamd_dispatcher_write (rspamd_io_dispatcher_t * d, void *data, size_t len, gbo |
|
|
|
newbuf->pos = newbuf->data->begin; |
|
|
|
newbuf->data->len = len; |
|
|
|
|
|
|
|
d->out_buffers = g_list_prepend (d->out_buffers, newbuf); |
|
|
|
g_queue_push_head (d->out_buffers, newbuf); |
|
|
|
|
|
|
|
if (!delayed) { |
|
|
|
debug_ip("plan write event"); |
|
|
|
|