diff options
Diffstat (limited to 'src/cache/udpqueue.c')
-rw-r--r-- | src/cache/udpqueue.c | 27 |
1 files changed, 20 insertions, 7 deletions
diff --git a/src/cache/udpqueue.c b/src/cache/udpqueue.c index 022df3d..8e1443a 100644 --- a/src/cache/udpqueue.c +++ b/src/cache/udpqueue.c @@ -2,6 +2,8 @@ #include <string.h> +#include <skalibs/allreadwrite.h> +#include <skalibs/tai.h> #include <skalibs/stralloc.h> #include <skalibs/genalloc.h> #include <skalibs/socket.h> @@ -9,12 +11,20 @@ #include "shibari-cache-internal.h" +void udpqueue_drop (udpqueue *q) +{ + q->storage.len = 0 ; + q->messages.len = 0 ; + tain_add_g(&q->deadline, &tain_infinite_relative) ; +} + int udpqueue_add4 (udpqueue *q, char const *ip, uint16_t port, char const *s, uint16_t len) { udp4msg msg = { .port = port, .len = len } ; if (!stralloc_readyplus(&q->storage, len)) return 0 ; memcpy(msg.ip, ip, 4) ; if (!genalloc_append(udp4msg, &q->messages, &msg)) return 0 ; + if (!q->storage.len) tain_add_g(&q->deadline, &g->wtto) ; stralloc_catb(&q->storage, s, len) ; return 1 ; } @@ -23,6 +33,7 @@ int udpqueue_flush4 (udpqueue *q) { size_t n = genalloc_n(udp4msg, &q->messages) ; size_t shead = 0, head = 0 ; + ssize_t r ; while (head < n) { udp4msg const *msg = genalloc_s(udp4msg, &q->messages) + head ; @@ -30,8 +41,7 @@ int udpqueue_flush4 (udpqueue *q) if (r <= 0) goto adjust ; shead += msg->len ; } - q->storage.len = 0 ; - genalloc_setlen(udp4msg, &q->messages, 0) ; + udpqueue_drop(q) ; return 1 ; adjust: @@ -39,7 +49,8 @@ int udpqueue_flush4 (udpqueue *q) q->storage.len -= shead ; memmove(genalloc_s(&udp4msg, &q->messages), genalloc_s(&udp4msg, &q->messages) + head, (n - head) * sizeof(udp4msg)) ; genalloc_setlen(&udp4msg, &q->messages, n - head) ; - return 0 ; + if (shead) tain_add_g(&q->deadline, &g->wtto) ; + return sanitize_read(r) ; } #ifdef SKALIBS_IPv6_ENABLED @@ -50,6 +61,7 @@ int udpqueue_add6 (udpqueue *q, char const *ip, uint16_t port, char const *s, ui if (!stralloc_readyplus(&q->storage, len)) return 0 ; memcpy(msg.ip, ip, 16) ; if (!genalloc_append(udp6msg, &q->messages, &msg)) return 0 ; + if (!q->storage.len) tain_add_g(&q->deadline, &g->wtto) ; stralloc_catb(&q->storage, s, len) ; return 1 ; } @@ -58,15 +70,15 @@ int udpqueue_flush6 (udpqueue *q) { size_t n = genalloc_n(udp6msg, &q->messages) ; size_t shead = 0, head = 0 ; + ssize_t r ; while (head < n) { udp6msg const *msg = genalloc_s(udp4msg, &q->messages) + head ; - ssize_t r = socket_send6(q->fd, q->storage.s + shead, msg->len, msg->ip, msg->port) ; + r = socket_send6(q->fd, q->storage.s + shead, msg->len, msg->ip, msg->port) ; if (r <= 0) goto adjust ; shead += msg->len ; } - q->storage.len = 0 ; - genalloc_setlen(udp6msg, &q->messages, 0) ; + udpqueue_drop(q) ; return 1 ; adjust: @@ -74,7 +86,8 @@ int udpqueue_flush6 (udpqueue *q) q->storage.len -= shead ; memmove(genalloc_s(&udp6msg, &q->messages), genalloc_s(&udp6msg, &q->messages) + head, (n - head) * sizeof(udp6msg)) ; genalloc_setlen(&udp6msg, &q->messages, n - head) ; - return 0 ; + if (shead) tain_add_g(&q->deadline, &g->wtto) ; + return sanitize_read(r) ; } #endif |