From 8c9b0194063411882d3bbcec80d5e5c861d37544 Mon Sep 17 00:00:00 2001 From: Laurent Bercot Date: Wed, 24 Jul 2024 18:54:07 +0000 Subject: Add stuff to shibari-cache Signed-off-by: Laurent Bercot --- src/cache/cache.c | 32 ++- src/cache/conf.c | 20 +- src/cache/shibari-cache-internal.h | 77 ++++++- src/cache/shibari-cache.c | 439 +++++++++++++++++++++++++++++++------ src/cache/tcpconnection.c | 5 - src/cache/udpqueue.c | 27 ++- src/config/defaults.c | 7 +- src/config/lexparse.c | 62 ++++-- src/include/shibari/dcache.h | 9 +- src/libdcache/dcache_add.c | 8 +- src/libdcache/dcache_init.c | 1 + src/libdcache/dcache_load.c | 12 +- 12 files changed, 572 insertions(+), 127 deletions(-) delete mode 100644 src/cache/tcpconnection.c diff --git a/src/cache/cache.c b/src/cache/cache.c index 8c820fb..186450c 100644 --- a/src/cache/cache.c +++ b/src/cache/cache.c @@ -1,6 +1,36 @@ /* ISC license. */ +#include +#include +#include + #include #include "shibari-cache-internal.h" -dcache_t cache = DCACHE_ZERO ; +static dcache_t cache = DCACHE_ZERO ; + +void cache_init (uint64_t cachesize) +{ + dcache_init(&cache, cachesize) ; +} + +void cache_dump (void) +{ + if (g->dumpfile) + { + if (!dcache_save(&cache, g->dumpfile)) + { + strerr_warnwu2sys("save cache contents to ", g->dumpfile) ; + unlink_void(file) ; + } + } +} + +void cache_load (void) +{ + if (g->dumpfile) + { + if (!dcache_load(&cache, g->dumpfile)) + strerr_warnwu2sys("load cache contents from ", g->dumpfile) ; + } +} diff --git a/src/cache/conf.c b/src/cache/conf.c index ea35646..209fdc3 100644 --- a/src/cache/conf.c +++ b/src/cache/conf.c @@ -4,6 +4,7 @@ #include #include +#include #include #include "shibari-cache-internal.h" @@ -23,7 +24,7 @@ int conf_getb (cdb const *c, char const *key, size_t keylen, cdb_data *data) int conf_get (cdb const *c, char const *key, cdb_data *data) { - return conf_get(c, key, strlen(key), data) ; + return conf_getb(c, key, strlen(key), data) ; } int conf_get_uint32 (cdb const *c, char const *key, uint32_t *value) @@ -34,3 +35,20 @@ int conf_get_uint32 (cdb const *c, char const *key, uint32_t *value) uint32_unpack_big(data.s, value) ; return 1 ; } + +int conf_get_uint64 (cdb const *c, char const *key, uint64_t *value) +{ + cdb_data data ; + if (!conf_get(conf, key, &data)) return 0 ; + if (data.len != 8) return (errno = EPROTO, 0) ; + uint64_unpack_big(data.s, value) ; + return 1 ; +} + +char const *conf_get_string (cdb const *c, char const *key) +{ + cdb_data data ; + if (!conf_get(conf, key, &data)) return 0 ; + if (!data.len || data.s[data.len - 1]) return (errno = EPROTO, 0) ; + return data.s ; +} diff --git a/src/cache/shibari-cache-internal.h b/src/cache/shibari-cache-internal.h index 7e03282..606d245 100644 --- a/src/cache/shibari-cache-internal.h +++ b/src/cache/shibari-cache-internal.h @@ -5,7 +5,9 @@ #include +#include #include +#include #include #include #include @@ -14,11 +16,12 @@ #include -#define MAXXED 1000 /* cache */ -extern dcache_t cache ; +extern void cache_init (uint64_t) ; +extern void cache_dump (void) ; +extern void cache_load (void) ; /* conf */ @@ -26,6 +29,8 @@ extern dcache_t cache ; extern int conf_getb (cdb const *, char const *, size_t, cdb_data *) ; extern int conf_get (cdb const *, char const *, cdb_data *) ; extern int conf_get_uint32 (cdb const *, char const *, uint32_t *) ; +extern int conf_get_uint64 (cdb const *, char const *, uint64_t *) ; +extern char const *conf_get_string (cdb const *, char const *) ; /* tcpconnection */ @@ -36,11 +41,18 @@ struct tcpconnection_s bufalloc out ; stralloc in ; uint32_t instate ; + tain rdeadline ; + tain wdeadline ; + uint32_t prev ; + uint32_t next ; + uint32_t xindex ; } ; -#define TCPCONNECTION_ZERO { .out = BUFALLOC_ZERO, .in = STRALLOC_ZERO, .instate = 0 } +#define TCPCONNECTION_ZERO { .out = BUFALLOC_ZERO, .in = STRALLOC_ZERO, .instate = 0, .rdeadline = TAIN_INFINITE, .wdeadline = TAIN_INFINITE, .prev = 0, .next = 0. .xindex = UINT32_MAX } +#define ntcp (genset_n(&g->tcpconnections) - 1) +#define TCPCONNECTION(i) genset_p(tcpconnection, &g->tcpconnections, (i)) +#define tcpstart (TCPCONNECTION(g->tcpsentinel)->next) -extern genset *tcpconn ; /* tcpconnection */ -#define ntcp (genset_n(tcpconn)) +extern void tcpconnection_drop (tcpconnection *) ; /* udpqueue */ @@ -69,8 +81,12 @@ struct udpqueue_s int fd ; stralloc storage ; genalloc messages ; /* udp[46]msg */ + tain deadline ; + uint32_t xindex ; } ; -#define UDPQUEUE_ZERO { .fd = -1, .storage = STRALLOC_ZERO, .messages = GENALLOC_ZERO } +#define UDPQUEUE_ZERO { .fd = -1, .storage = STRALLOC_ZERO, .messages = GENALLOC_ZERO, .deadline = TAIN_INFINITE, .xindex = UINT32_MAX } + +extern void udpqueue_drop (udpqueue *) ; extern int udpqueue_add4 (udpqueue *, char const *, uint16_t) ; extern int udpqueue_flush4 (udpqueue *) ; @@ -81,17 +97,56 @@ extern int udpqueue_flush6 (udpqueue *) ; #endif - /* main */ + /* query */ typedef struct query_s query, *query_ref ; struct query_s { s6dns_engine_t dt ; - size_t origin ; + uint32_t origin ; + uint32_t prev ; + uint32_t next ; + uint32_t xindex ; + char ip[16] ; + uint16_t port ; } ; +#define QUERY_ZERO { .dt = S6DNS_ENGINE_ZERO, .origin = 0, .prev = 0, .next = 0, .xindex = UINT32_MAX, .ip = { 0 }, .port = 0 } +#define nq (genset_n(&g->queries) - 1) +#define QUERY(i) genset_p(query, &g->queries, (i)) +#define qstart (QUERY(g->qsentinel)->next) + +extern void query_fail (query *) ; +extern void query_success (query *) ; +extern void query_new (uint32_t, char const *, uint8_t, uint16_t, char const *, uint16_t) ; -extern uint32_t verbosity ; -extern cdb confdb ; -extern size_t n4, n6 ; + + /* main */ + +typedef struct global_s global, *global_ref ; +struct global_s +{ + cdb confdb ; + char const *dumpfile ; + uint32_t verbosity ; + tain rtto ; + tain wtto ; + genset tcpconnections ; /* tcpconnection */ + uint32_t tcpsentinel ; + genset queries ; /* query */ + uint32_t qsentinel ; +} ; +#define GLOBAL_ZERO { \ + .confdb = CDB_ZERO, \ + .dumpfile = 0, \ + .verbosity = 1, \ + .rtto = TAIN_INFINITE, \ + .wtto = TAIN_INFINITE, \ + .tcpconnections = GENSET_ZERO, \ + .tcpsentinel = 0, \ + .queries = GENSET_ZERO, \ + .qsentinel = 0, \ +} + +extern global *g ; #endif diff --git a/src/cache/shibari-cache.c b/src/cache/shibari-cache.c index 09743d9..99d52b8 100644 --- a/src/cache/shibari-cache.c +++ b/src/cache/shibari-cache.c @@ -11,7 +11,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -29,21 +31,64 @@ #include "shibari-cache-internal.h" -#define USAGE "shibari-cache [ -U ] [ -d notif ] [ -f conf.cdb ] [ -D cachedumpfile ] [ -w wtimeout ] [ -i rulesdir | -x rulesfile ]" +#define USAGE "shibari-cache [ -U ] [ -d notif ] [ -f conf.cdb ]" #define dieusage() strerr_dieusage(100, USAGE) +#define MAXSAME 32 -uint32_t verbosity ; -cdb confdb = CDB_ZERO ; -size_t n4 = 0, n6 = 0, ntcp = 0 ; +global *g = 0 ; -static int cont = 1 ; -static int sfd = -1 ; -static char const *dumpfile = 0 ; +static int flagwantfinaldump = 1 ; +static tain lameduckt = TAIN_INFINITE_RELATIVE ; -static inline void reload (void) +static inline void conf_init (char const *conffile, uint32_t *n4, uint32_t *n6, char const **ip4, char const **ip6, uint32_t *maxtcp, uint32_t *maxqueries) { + cdb_data data ; + uint32_t u ; + if (!conf_get_uint32(&g->confdb, "G:logv", &g->verbosity)) + strerr_diefu4sys(102, "read ", "G:logv", " configuration key from ", conffile) ; + { + uint64_t cachesize ; + if (!conf_get_uint64(&g->confdb, "G:cachesize", &cachesize)) + strerr_diefu4sys(102, "read ", "G:cachesize", " configuration key from ", conffile) ; + if (cachesize < 4096) + strerr_dief2x(102, "invalid G:cachesize in ", conffile) ; + cache_init(cachesize) ; + } + if (!conf_get_uint32(&g->confdb, "G:maxtcp", maxtcp)) + strerr_diefu4sys(102, "read ", "G:maxtcp", " configuration key from ", conffile) ; + if (*maxtcp > 4096 || *maxtcp < 1) + strerr_dief2x(102, "invalid G:maxtcp in ", conffile) ; + if (!conf_get_uint32(&g->confdb, "G:maxqueries", maxqueries)) + strerr_diefu4sys(102, "read ", "G:maxqueries", " configuration key from ", conffile) ; + if (*maxqueries > 8192 || *maxqueries < 1) + strerr_dief2x(102, "invalid G:maxqueries in ", conffile) ; + if (!conf_get_uint32(&g->confdb, "G:rtimeout", &u)) + if (u) tain_from_millisecs(&g->rtto, u) ; + strerr_diefu4sys(102, "read ", "G:rtimeout", " configuration key from ", conffile) ; + if (!conf_get_uint32(&g->confdb, "G:wtimeout", &u)) + strerr_diefu4sys(102, "read ", "G:wtimeout", " configuration key from ", conffile) ; + if (u) tain_from_millisecs(&g->wtto, u) ; + g->dumpfile = conf_get_string(&g->confdb, "G:cachefile") ; + if (!g->dumpfile && errno != ENOENT) + strerr_diefu4sys(102, "read ", "G:cachefile", " configuration key from ", conffile) ; + + if (!conf_get(&g->confdb, "G:listen4", &data)) + strerr_diefu4sys(102, "read ", "G:listen4", " configuration key from ", conffile) ; + if (data.len & 3) + strerr_diefu4sys(102, "invalid ", "G:listen4", " key in ", conffile) ; + *n4 = data.len >> 2 ; + *ip4 = data.s ; +#ifdef SKALIBS_IPV6_ENABLED + if (!conf_get(&g.confdb, "G:listen6", &data)) + strerr_diefu4sys(102, "read ", "G:listen6", " configuration key from ", conffile) ; + if (data.len & 15) + strerr_diefu4sys(102, "invalid ", "G:listen6", " key in ", conffile) ; + *n6 = data.len >> 4 ; + *ip6 = data.s ; +#endif + if (!*n4 && !*n6) strerr_dief1x(102, "no listen addresses configured" ; } static inline void handle_signals (void) @@ -52,43 +97,49 @@ static inline void handle_signals (void) { case -1 : strerr_diefu1sys(111, "read selfpipe") ; case 0 : return ; - case SIGTERM : cont = 0 ; break ; - case SIGHUP : reload() ; break ; - case SIGALRM : dump_cache(dumpfile) ; break ; + case SIGHUP : flagwantfinaldump = 0 ; /* fallthrough */ + case SIGTERM : + if (cont >= 2) + { + tain_add_g(&lameduckt, &lameduckt) ; + cont = 1 ; + } + break ; + case SIGQUIT : cont = 0 ; flagwantfinaldump = 0 ; break ; + case SIGALRM : cache_dump() ; break ; default : break ; } } int main (int argc, char const *const *argv) { + global globals = GLOBAL_ZERO ; char const *conffile = SHIBARI_SYSCONFDIR "/shibari-cache.conf.cdb" ; + uint32_t n4 = 0, n6 = 0, maxtcp, maxqueries ; + char const *ip4 = 0, *ip6 = 0 ; + unsigned int cont = 2 ; + int sfd = -1 ; unsigned int notif = 0 ; - char const *ip4 ; - char const *ip6 ; uid_t uid = 0 ; gid_t gid = 0 ; PROG = "shibari-cache" ; + g = &globals ; { int flagdrop = 0 ; subgetopt l = SUBGETOPT_ZERO ; for (;;) { - int opt = subgetopt_r(argc, argv, "Ud:f:D:w:i:x:", &l) ; + int opt = subgetopt_r(argc, argv, "Ud:f:", &l) ; if (opt == -1) break ; switch (opt) { case 'U' : flagdrop = 1 ; break ; case 'd' : if (!uint0_scan(l.arg, ¬if)) dieusage() ; break ; case 'f' : conffile = l.arg ; break ; - case 'D' : dumpfile = l.arg ; break ; - case 'w' : if (!uint0_scan(l.arg, &wtimeout)) dieusage() ; break ; - case 'i' : rulesfile = l.arg ; rulestype = 1 ; break ; - case 'x' : rulesfile = l.arg ; rulestype = 2 ; break ; default : strerr_dieusage(10, USAGE) ; } } argc -= l.ind ; argv += l.ind ; - if (!ip46_scan(argv[0], &localip)) dieusage() ; if (flagdrop) { char const *x = getenv("UID") ; @@ -98,7 +149,6 @@ int main (int argc, char const *const *argv) if (!x) strerr_dienotset(100, "GID") ; if (!uid0_scan(x, &gid)) strerr_dieinvalid(100, "GID") ; } - if (wtimeout) tain_from_millisecs(&wtto, wtimeout) ; } if (notif) @@ -109,6 +159,10 @@ int main (int argc, char const *const *argv) close(0) ; close(1) ; + + if (!cdb_init(&g->confdb, conffile)) strerr_diefu2sys(111, "open ", conffile) ; + conf_init(conffile, &n4, &n6, &ip4, &ip6, &maxtcp, &maxqueries, &dumpfile) ; + sfd = selfpipe_init() ; if (sfd == -1) strerr_diefu1sys(111, "create selfpipe") ; if (!sig_altignore(SIGPIPE)) strerr_diefu1sys(111, "ignore SIGPIPE") ; @@ -117,60 +171,53 @@ int main (int argc, char const *const *argv) sigemptyset(&set) ; sigaddset(&set, SIGHUP) ; sigaddset(&set, SIGTERM) ; + sigaddset(&set, SIGQUIT) ; sigaddset(&set, SIGALRM) ; if (!selfpipe_trapset(&set)) strerr_diefu1sys(111, "trap signals") ; } - if (!cdb_init(&confdb, conffile)) strerr_diefu2sys(111, "open cdb file ", conffile) ; - { - cdb_data data ; - if (!conf_get_uint32(&confdb, "G:logv", &verbosity)) - strerr_diefu1sys(111, "read verbosity from config") ; - if (!conf_get_uint32(&confdb, "G:maxtcp", &maxtcp)) - strerr_diefu1sys(111, "read maxtcp from config") ; - if (maxtcp > 4000 || maxtcp < 1) - strerr_dief1x(102, "invalid maxtcp in config") ; - if (!conf_get(&confdb, "G:listen4", &data)) - strerr_diefu3sys(111, "read ", "G:listen4", " entry from config") ; - if (data.len & 3) - strerr_diefu2sys(102, "invalid length for ", "G:listen4") ; - n4 = data.len >> 2 ; - ip4 = data.s ; -#ifdef SKALIBS_IPV6_ENABLED - if (!conf_get(&confdb, "G:listen6", &data)) - strerr_diefu3sys(111, "read ", "G:listen6", " entry from config") ; - if (data.len & 15) - strerr_diefu2sys(102, "invalid length for ", "G:listen6") ; - n6 = data.len >> 4 ; - ip6 = data.s ; -#endif - } - if (!n4 && !n6) strerr_diefu1x(102, "no listen addresses configured" ; + udpqueue udpq4[n4 ? n4 : 1] ; + udpqueue udpq6[n6 ? n6 : 1] ; + int tcp4fd[n4 ? n4 : 1] ; + int tcp6fd[n6 ? n6 : 1] ; + uint32_t tcp4xindex[n4 ? n4 : 1] ; + uint32_t tcp6xindex[n4 ? n4 : 1] ; + tcpconnection tcpconnection_storage[maxtcp + 1] ; + uint32_t tcpconnection_freelist[maxtcp + 1] ; + query query_storage[maxqueries + 1] ; + uint32_t query_freelist[maxqueries + 1] ; - { - genalloc queries = GENALLOC_ZERO ; /* query */ - int fd4[n4 ? n4 : 1][2] ; - int fd6[n6 ? n6 : 1][2] ; - tcpconnection tcpconn_storage[maxtcp] ; - uint32_t tcpconn_freelist[maxtcp] ; - genset tcpconn_genset ; - tcpconn = &tcpconn_genset ; - GENSET_init(tcpconn, tcpconnection, tcpconn_storage, tcpconn_freelist, maxtcp) ; + memset(udpq4, 0, n4 * sizeof(udpqueue)) ; + memset(udpq6, 0, n6 * sizeof(udpqueue)) ; + GENSET_init(g->tcpconnections, tcpconnection, tcpconnection_storage, tcpconnection_freelist, maxtcp + 1) ; + g->tcpsentinel = genset_new(g->tcpconnections) ; + GENSET_init(g->queries, query, query_storage, query_freelist, maxqueries + 1) ; + g->qsentinel = genset_new(g->queries) ; + { + static const tcpconnection tcpconnection_zero = TCPCONNECTION_ZERO ; + static const query query_zero = QUERY_ZERO ; + tcpconnection *p = TCPCONNECTION(g->tcpsentinel) ; + query *q = QUERY(g->qsentinel) ; + *p = tcpconnection_zero ; + p->prev = p->next = g->tcpsentinel ; + *q = query_zero ; + q->prev = q->next = g->qsentinel ; + } for (size_t i = 0 ; i < n4 ; i++) { - fd4[i][0] = socket_udp4_nbcoe() ; - if (fd4[i][0] == -1) strerr_diefu1sys(111, "create udp4 socket") ; - if (socket_bind4_reuse(fd4[i][0], ip4 + (i << 2), 53) == -1) + udpq4[i].fd = socket_udp4_nbcoe() ; + if (udpq4[i].fd == -1) strerr_diefu1sys(111, "create udp4 socket") ; + if (socket_bind4_reuse(udpq4[i].fd, ip4 + (i << 2), 53) == -1) { char fmt[IP4_FMT] ; fmt[ip4_fmt(fmt, ip4 + (i << 2))] = 0 ; strerr_diefu3sys(111, "bind to ip ", fmt, " UDP port 53") ; } - fd4[i][1] = socket_tcp4_nbcoe() ; - if (fd4[i][1] == -1) strerr_diefu1sys(111, "create tcp4 socket") ; - if (socket_bind4_reuse(fd4[i][1], ip4 + (i << 2), 53) == -1) + tcp4fd[i] = socket_tcp4_nbcoe() ; + if (tcp4fd[i] == -1) strerr_diefu1sys(111, "create tcp4 socket") ; + if (socket_bind4_reuse(tcp4fd[i], ip4 + (i << 2), 53) == -1) { char fmt[IP4_FMT] ; fmt[ip4_fmt(fmt, ip4 + (i << 2))] = 0 ; @@ -180,17 +227,17 @@ int main (int argc, char const *const *argv) #ifdef SKALIBS_IPV6_ENABLED for (size_t i = 0 ; i < n6 ; i++) { - fd6[i][0] = socket_udp6_nbcoe() ; - if (fd6[i][0] == -1) strerr_diefu1sys(111, "create udp6 socket") ; - if (socket_bind6_reuse(fd6[i][0], ip6 + (i << 4), 53) == -1) + udpq6[i].fd = socket_udp6_nbcoe() ; + if (udpq6[i].fd == -1) strerr_diefu1sys(111, "create udp6 socket") ; + if (socket_bind6_reuse(udpq6[i].fd, ip6 + (i << 4), 53) == -1) { char fmt[IP6_FMT] ; fmt[ip6_fmt(fmt, ip6 + (i << 4))] = 0 ; strerr_diefu3sys(111, "bind to ip ", fmt, " UDP port 53") ; } - fd6[i][1] = socket_tcp6_nbcoe() ; - if (fd6[i][1] == -1) strerr_diefu1sys(111, "create tcp6 socket") ; - if (socket_bind4_reuse(fd6[i][1], ip6 + (i << 4), 53) == -1) + tcp6fd[i] = socket_tcp6_nbcoe() ; + if (tcp6fd[i] == -1) strerr_diefu1sys(111, "create tcp6 socket") ; + if (socket_bind4_reuse(tcp6fd[i], ip6 + (i << 4), 53) == -1) { char fmt[IP6_FMT] ; fmt[ip6_fmt(fmt, ip6 + (i << 4))] = 0 ; @@ -201,6 +248,8 @@ int main (int argc, char const *const *argv) if (gid && setgid(gid) == -1) strerr_diefu1sys(111, "setgid") ; if (uid && setuid(uid) == -1) strerr_diefu1sys(111, "setuid") ; + + cache_load() ; if (!tain_now_set_stopwatch_g()) strerr_diefu1sys(111, "initialize clock") ; if (notif) @@ -209,18 +258,266 @@ int main (int argc, char const *const *argv) close(notif) ; } - while (cont) + + /* main loop */ + + while (cont) /* quick exit condition */ { - size_t n = genalloc_len(query, &queries) ; - iopause_fd x[1 + n4 + n6 + n] ; + tain deadline = TAIN_INFINITE ; + int r = 0 ; + uint32_t j = 1 ; + iopause_fd x[1 + (n4 + n6) * 2 + ntcp + nq] ; + + + /* preparation */ + x[0].fd = sfd ; x[0].events = IOPAUSE_READ ; - for (size_t i = 0 ; i < n4 ; i++) + if (cont == 1 && tain_less(&lameduckt, &deadline)) deadline = lameduckt ; + + for (uint32_t i = 0 ; i < n4 ; i++) { + x[j].fd = udpq4[i].fd ; + x[j].events = nq < maxqueries && cont >= 2 ? IOPAUSE_READ : 0 ; + if (genalloc_len(udp4msg, &udpq4[i].messages)) + { + x[j].events |= IOPAUSE_WRITE ; + if (tain_less(&udpq4[i].deadline, &deadline)) deadline = udpq4[i].deadline ; + r = 1 ; + } + if (x[j].events) udpq4[i].xindex = j++ ; else udpq4[i].xindex = UINT32_MAX ; + + if (ntcp < maxtcp && cont >= 2) + { + x[j].fd = tcp4fd[i] ; + x[j].events = IOPAUSE_READ ; + tcp4xindex[i] = j++ ; + } + else tcp4xindex[i] = UINT32_MAX ; + } + +#ifdef SKALIBS_IPV6_ENABLED + for (uint32_t i = 0 ; i < n6 ; i++) + { + x[j].fd = udpq6[i].fd ; + x[j].events = nq < maxqueries && cont >= 2 ? IOPAUSE_READ : 0 ; + if (genalloc_len(udp6msg, &udpq6[i].messages)) + { + x[j].events |= IOPAUSE_WRITE ; + if (tain_less(&udpq6[i].deadline, &deadline)) deadline = udpq6[i].deadline ; + r = 1 ; + } + if (x[j].events) udpq6[i].xindex = j++ ; else udpq6[i].xindex = UINT32_MAX ; + + if (ntcp < maxtcp && cont >= 2) + { + x[j].fd = tcp6fd[i] ; + x[j].events = IOPAUSE_READ ; + tcp6xindex[i] = j++ ; + } + else tcp6xindex[i] = UINT32_MAX ; + } +#endif + + for (uint32_t i = tcpstart ; i != g->tcpsentinel ; i = TCPCONNECTION(i)->next) + { + tcpconnection *p = TCPCONNECTION(i) ; + x[j].fd = bufalloc_fd(&p->out) ; + if (nq < maxqueries && cont >= 2) + { + x[j].events = IOPAUSE_READ ; + if (tain_less(&p->rdeadline, &deadline)) deadline = p->rdeadline ; + } + else x[j].events = 0 ; + if (bufalloc_len(&p->out)) + { + x[j].events |= IOPAUSE_WRITE ; + if (tain_less(&p->wdeadline, &deadline)) deadline = p->wdeadline ; + r = 1 ; + } + if (x[j].events) p->xindex = j++ ; else p->xindex = UINT32_MAX ; } - } + for (uint32_t i = qstart ; i != g->qsentinel ; i = QUERY(i)->next) + { + query *p = QUERY(i) ; + x[j].fd = p->dt.fd ; + s6dns_engine_nextdeadline(&p->dt, &deadline) ; + x[j].events = (s6dns_engine_isreadable(&p->dt) ? IOPAUSE_READ : 0) | (s6dns_engine_iswritable(&p->dt) ? IOPAUSE_WRITE : 0) ; + if (x[j].events) p->xindex = j++ ; else p->xindex = UINT32_MAX ; + } + + + /* exit condition */ + + if (cont < 2 && !r && !nq) break ; + + + /* poll() */ + + r = iopause_g(x, j, &deadline) ; + if (r == -1) strerr_diefu1sys(111, "iopause") ; + + + /* timeout */ + + if (!r) + { + if (cont == 1 && !tain_future(&lameduckt)) break ; + for (uint32_t i = qstart ; i != g->qsentinel ; i = QUERY(i)->next) + { + query *p = QUERY(i) ; + if (s6dns_engine_timeout_g(&p->dt)) { i = p->prev ; query_fail(p) ; } + } + for (uint32_t i = tcpstart ; i != g->tcpsentinel ; i = TCPCONNECTION(i)->next) + { + tcpconnection *p = TCPCONNECTION(i) ; + if (!tain_future(&p->rdeadline) || !tain_future(&p->wdeadline)) + tcpconnection_drop(p) ; + } + for (uint32_t i = 0 ; i < n4 ; i++) + if (!tain_future(&udp4q[i].deadline)) udpqueue_drop(udp4q + i) ; + for (uint32_t i = 0 ; i < n6 ; i++) + if (!tain_future(&udp6q[i].deadline)) udpqueue_drop(udp6q + i) ; + } + + + /* event */ + + else + { + for (uint32_t i = 0 ; i < j ; i++) if (x[i].revents & IOPAUSE_EXCEPT) x[i].revents |= x[i].events ; + + if (x[0].revents & IOPAUSE_READ) { handle_signals() ; continue ; } + + for (uint32_t i = 0 ; i < n4 ; i++) if (udpq4[i].xindex < UINT32_MAX) + { + if (x[udpq4[i].xindex].revents & IOPAUSE_WRITE) + { + if (udpqueue_flush4(udpq4 + i) == -1) + { + char fmt[IP4_FMT] ; + fmt[ip4_fmt(fmt, ip4 + (i << 2))] = 0 ; + strerr_diefu2sys(111, "write to UDP socket bound to ", fmt) ; + } + } + } + +#ifdef SKALIBS_IPV6_ENABLED + for (uint32_t i = 0 ; i < n6 ; i++) if (udpq6[i].xindex < UINT32_MAX) + { + if (x[udpq6[i].xindex].revents & IOPAUSE_WRITE) + { + if (udpqueue_flush6(udpq6 + i) == -1) + { + char fmt[IP6_FMT] ; + fmt[ip6_fmt(fmt, ip6 + (i << 4))] = 0 ; + strerr_diefu2sys(111, "write to socket bound to ", fmt) ; + } + } + } +#endif + + for (uint32_t i = tcpstart ; i != g->tcpsentinel ; i = TCPCONNECTION(i)->next) + { + tcpconnection *p = TCPCONNECTION(i) ; + if (p->xindex < UINT32_MAX && x[p->xindex].revents & IOPAUSE_WRITE) + { + if (tcpconnection_flush(p) == -1) + { + i = p->prev ; + tcpconnection_drop(p) ; + } + } + } + + for (uint32_t i = qstart ; i != g->qsentinel ; i = QUERY(i)->next) + { + query *p = QUERY(i) ; + if (p->xindex == UINT32_MAX) continue ; + r = s6dns_engine_event_g(&p->dt) ; + if (r) i = p->prev ; + if (r == -1) query_fail(p) ; else query_success(p) ; + } + + for (uint32_t i = 0 ; i < n4 ; i++) + { + if (udpq4[i].xindex < UINT32_MAX && x[udpq4[i].xindex].revents & IOPAUSE_READ) + { + uint32_t n = MAXSAME ; + char buf[513] ; + char ip[4] ; + uint16_t port ; + while (n-- && nq < maxqueries) + { + ssize_t len = sanitize_read(socket_recv4(udpq4[i].fd, buf, 512, ip, &port)) ; + if (len == -1) + { + char fmt[IP4_FMT] ; + fmt[ip4_fmt(fmt, ip4 + (i << 2))] = 0 ; + strerr_diefu2sys(111, "read from UDP socket bound to ", fmt) ; + } + if (!len) break ; + if (len < 12 || len > 512) continue ; + if (!ip4_access(ip)) continue ; + query_new(i, ip, 4, port, buf, len) ; + } + } + } + +#ifdef SKALIBS_IPV6_ENABLED + for (uint32_t i = 0 ; i < n6 ; i++) + { + if (udpq6[i].xindex < UINT32_MAX && x[udpq6[i].xindex].revents & IOPAUSE_READ) + { + uint32_t n = MAXSAME ; + char buf[513] ; + char ip[16] ; + uint16_t port ; + while (n-- && nq < maxqueries) + { + ssize_t len = sanitize_read(socket_recv6(udpq6[i].fd, buf, 512, ip, &port)) ; + if (len == -1) + { + char fmt[IP6_FMT] ; + fmt[ip6_fmt(fmt, ip6 + (i << 4))] = 0 ; + strerr_diefu2sys(111, "read from UDP socket bound to ", fmt) ; + } + if (!len) break ; + if (len < 12 || len > 512) continue ; + if (!ip6_access(ip)) continue ; + query_new(n4 + i, ip, 16, port, buf, len) ; + } + } + } +#endif + + for (uint32_t i = tcpstart ; i != g->tcpsentinel ; i = TCPCONNECTION(i)->next) + { + tcpconnection *p = TCPCONNECTION(i) ; + if (p->xindex < UINT32_MAX && x[p->xindex].revents & IOPAUSE_READ) + { + int l = sanitize_read(mininetstring_read(bufalloc_fd(&p->out), &p->in, &p->instate)) ; + if (l == -1) { i = p->prev ; tcpconnection_drop(p) ; } + if (l <= 0) continue ; + if (sa.len < 12 || sa.len > 65536) { i = p->prev ; tcpconnection_drop(p) ; continue ; } + query_new(n4 + n6 + i, 0, 0, 0, sa.s, sa.len) ; + sa.len = 0 ; + } + } + + for (uint32_t i = 0 ; i < n4 ; i++) if (tcp4xindex[i] < UINT32_MAX) + { + if (x[tcp4index[i]].revents & IOPAUSE_READ) + { + } + } + + } + } } + + if (flagwantfinaldump) cache_dump() ; shibari_log_exit(verbosity, 0) ; return 0 ; } diff --git a/src/cache/tcpconnection.c b/src/cache/tcpconnection.c deleted file mode 100644 index bd72fb4..0000000 --- a/src/cache/tcpconnection.c +++ /dev/null @@ -1,5 +0,0 @@ -/* ISC license. */ - -#include "shibari-cache-internal.h" - -genset *tcpconn = 0 ; diff --git a/src/cache/udpqueue.c b/src/cache/udpqueue.c index 022df3d..8e1443a 100644 --- a/src/cache/udpqueue.c +++ b/src/cache/udpqueue.c @@ -2,6 +2,8 @@ #include +#include +#include #include #include #include @@ -9,12 +11,20 @@ #include "shibari-cache-internal.h" +void udpqueue_drop (udpqueue *q) +{ + q->storage.len = 0 ; + q->messages.len = 0 ; + tain_add_g(&q->deadline, &tain_infinite_relative) ; +} + int udpqueue_add4 (udpqueue *q, char const *ip, uint16_t port, char const *s, uint16_t len) { udp4msg msg = { .port = port, .len = len } ; if (!stralloc_readyplus(&q->storage, len)) return 0 ; memcpy(msg.ip, ip, 4) ; if (!genalloc_append(udp4msg, &q->messages, &msg)) return 0 ; + if (!q->storage.len) tain_add_g(&q->deadline, &g->wtto) ; stralloc_catb(&q->storage, s, len) ; return 1 ; } @@ -23,6 +33,7 @@ int udpqueue_flush4 (udpqueue *q) { size_t n = genalloc_n(udp4msg, &q->messages) ; size_t shead = 0, head = 0 ; + ssize_t r ; while (head < n) { udp4msg const *msg = genalloc_s(udp4msg, &q->messages) + head ; @@ -30,8 +41,7 @@ int udpqueue_flush4 (udpqueue *q) if (r <= 0) goto adjust ; shead += msg->len ; } - q->storage.len = 0 ; - genalloc_setlen(udp4msg, &q->messages, 0) ; + udpqueue_drop(q) ; return 1 ; adjust: @@ -39,7 +49,8 @@ int udpqueue_flush4 (udpqueue *q) q->storage.len -= shead ; memmove(genalloc_s(&udp4msg, &q->messages), genalloc_s(&udp4msg, &q->messages) + head, (n - head) * sizeof(udp4msg)) ; genalloc_setlen(&udp4msg, &q->messages, n - head) ; - return 0 ; + if (shead) tain_add_g(&q->deadline, &g->wtto) ; + return sanitize_read(r) ; } #ifdef SKALIBS_IPv6_ENABLED @@ -50,6 +61,7 @@ int udpqueue_add6 (udpqueue *q, char const *ip, uint16_t port, char const *s, ui if (!stralloc_readyplus(&q->storage, len)) return 0 ; memcpy(msg.ip, ip, 16) ; if (!genalloc_append(udp6msg, &q->messages, &msg)) return 0 ; + if (!q->storage.len) tain_add_g(&q->deadline, &g->wtto) ; stralloc_catb(&q->storage, s, len) ; return 1 ; } @@ -58,15 +70,15 @@ int udpqueue_flush6 (udpqueue *q) { size_t n = genalloc_n(udp6msg, &q->messages) ; size_t shead = 0, head = 0 ; + ssize_t r ; while (head < n) { udp6msg const *msg = genalloc_s(udp4msg, &q->messages) + head ; - ssize_t r = socket_send6(q->fd, q->storage.s + shead, msg->len, msg->ip, msg->port) ; + r = socket_send6(q->fd, q->storage.s + shead, msg->len, msg->ip, msg->port) ; if (r <= 0) goto adjust ; shead += msg->len ; } - q->storage.len = 0 ; - genalloc_setlen(udp6msg, &q->messages, 0) ; + udpqueue_drop(q) ; return 1 ; adjust: @@ -74,7 +86,8 @@ int udpqueue_flush6 (udpqueue *q) q->storage.len -= shead ; memmove(genalloc_s(&udp6msg, &q->messages), genalloc_s(&udp6msg, &q->messages) + head, (n - head) * sizeof(udp6msg)) ; genalloc_setlen(&udp6msg, &q->messages, n - head) ; - return 0 ; + if (shead) tain_add_g(&q->deadline, &g->wtto) ; + return sanitize_read(r) ; } #endif diff --git a/src/config/defaults.c b/src/config/defaults.c index 9fef25b..9222aa4 100644 --- a/src/config/defaults.c +++ b/src/config/defaults.c @@ -14,13 +14,18 @@ struct defaults_s #define REC(k, v, n) { .key = (k), .value = (v), .vlen = (n) } #define RECS(k, v) REC(k, (v), sizeof(v)) #define RECU32(k, u) { .key = (k), .value = (char const [4]){ (u) >> 24 & 0xffu, (u) >> 16 & 0xffu, (u) >> 8 & 0xffu, (u) & 0xffu }, .vlen = 4 } +#define RECU64(k, u) { .key = (k), .value = (char const [8]){ (u) >> 56 & 0xffu, (u) >> 48 & 0xffu, (u) >> 40 & 0xffu, (u) >> 32 & 0xffu, (u) >> 24 & 0xffu, (u) >> 16 & 0xffu, (u) >> 8 & 0xffu, (u) & 0xffu }, .vlen = 8 } static struct defaults_s const defaults[] = { RECU32("G:logv", 1), - RECU32("G:maxtcp", 256), REC("G:listen4", "\177\0\0\1", 4), REC("G:listen6", "\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\1", 16), + RECU32("G:maxtcp", 256), + RECU32("G:maxqueries", 1024), + RECU64("G:cachesize", 1048576), + RECU32("G:rtimeout", 0), + RECU32("G:wtimeout", 0), REC("R4:", "\0\306\51\0\4" diff --git a/src/config/lexparse.c b/src/config/lexparse.c index 250fa62..093acf7 100644 --- a/src/config/lexparse.c +++ b/src/config/lexparse.c @@ -29,7 +29,12 @@ struct namevalue_s enum directivevalue_e { T_VERBOSITY, + T_CACHESIZE, T_MAXTCP, + T_MAXQUERIES, + T_RTIMEOUT, + T_WTIMEOUT, + T_CACHEFILE, T_LISTEN, T_ACCEPT, T_SERVER, @@ -101,25 +106,30 @@ static int ipcmp (void const *a, void const *b, size_t n) } -static inline void parse_verbosity (char const *s, size_t const *word, size_t n, char const *ifile, uint32_t line) +static void parse_u32 (char const *s, size_t const *word, size_t n, char const *ifile, uint32_t line, char const *directive, char const *key) { - uint32_t v ; + uint32_t u ; char pack[4] ; - if (n != 1) dieparse(ifile, line, "verbosity", n ? "too many arguments" : "too few arguments", 0, 0, 0) ; - if (!uint320_scan(s + word[0], &v)) dieparse(ifile, line, "verbosity", "argument must be an integer", 0, 0, 0) ; - uint32_pack_big(pack, v) ; - adds_unique(ifile, line, "verbosity", "G:logv", pack, 4) ; + if (n != 1) dieparse(ifile, line, directive, n ? "too many arguments" : "too few arguments", 0, 0, 0) ; + if (!uint320_scan(s + word[0], &u)) dieparse(ifile, line, directive, "argument must be an integer", 0, 0, 0) ; + uint32_pack_big(pack, u) ; + adds_unique(ifile, line, directive, key, pack, 4) ; } -static inline void parse_maxtcp (char const *s, size_t const *word, size_t n, char const *ifile, uint32_t line) +static void parse_u64 (char const *s, size_t const *word, size_t n, char const *ifile, uint32_t line, char const *directive, char const *key) { - uint32_t max ; - char pack[4] ; - if (n != 1) dieparse(ifile, line, "maxtcp", n ? "too many arguments" : "too few arguments", 0, 0, 0) ; - if (!uint320_scan(s + word[0], &max)) dieparse(ifile, line, "maxtcp", "argument must be an integer", 0, 0, 0) ; - if (max > 4000) dieparse(ifile, line, "maxtcp", "argument must be 4000 or less", 0, 0, 0) ; - uint32_pack_big(pack, max) ; - adds_unique(ifile, line, "maxtcp", "G:maxtcp", pack, 4) ; + uint64_t u ; + char pack[8] ; + if (n != 1) dieparse(ifile, line, directive, n ? "too many arguments" : "too few arguments", 0, 0, 0) ; + if (!uint640_scan(s + word[0], &u)) dieparse(ifile, line, directive, "argument must be an integer", 0, 0, 0) ; + uint64_pack_big(pack, u) ; + adds_unique(ifile, line, directive, key, pack, 8) ; +} + +static void parse_string (char const *s, size_t const *word, size_t n, char const *ifile, uint32_t line, char const *directive, char const *key) +{ + if (n != 1) dieparse(ifile, line, directive, n ? "too many arguments" : "too few arguments", 0, 0, 0) ; + adds_unique(ifile, line, directive, key, s + word[0], strlen(s + word[0]) + 1) ; } static inline void parse_listen (char const *s, size_t const *word, size_t n, char const *ifile, uint32_t line) @@ -195,11 +205,16 @@ static inline void process_line (char const *s, size_t const *word, size_t n, ch static struct namevalue_s const directives[] = { { .name = "accept", .value = T_ACCEPT }, + { .name = "cache_file", .value = T_CACHEFILE }, + { .name = "cache_size", .value = T_CACHESIZE }, { .name = "forward", .value = T_FORWARD }, { .name = "listen", .value = T_LISTEN }, + { .name = "maxqueries", .value = T_MAXQUERIES }, { .name = "maxtcp", .value = T_MAXTCP }, + { .name = "read_timeout", .value = T_RTIMEOUT }, { .name = "server", .value = T_SERVER }, { .name = "verbosity", .value = T_VERBOSITY }, + { .name = "write_timeout", .value = T_WTIMEOUT }, } ; struct namevalue_s const *directive ; char const *word0 ; @@ -210,10 +225,25 @@ static inline void process_line (char const *s, size_t const *word, size_t n, ch switch (directive->value) { case T_VERBOSITY : - parse_verbosity(s, word, n, ifile, line) ; + parse_u32(s, word, n, ifile, line, "verbosity", "G:logv") ; + break ; + case T_CACHESIZE : + parse_u64(s, word, n, ifile, line, "cache_size", "G:cachesize") ; break ; case T_MAXTCP : - parse_maxtcp(s, word, n, ifile, line) ; + parse_u32(s, word, n, ifile, line, "maxtcp", "G:maxtcp") ; + break ; + case T_MAXQUERIES : + parse_u32(s, word, n, ifile, line, "maxqueries", "G:maxqueries") ; + break ; + case T_RTIMEOUT : + parse_u32(s, word, n, ifile, line, "read_timeout", "G:rtimeout") ; + break ; + case T_WTIMEOUT : + parse_u32(s, word, n, ifile, line, "write_timeout", "G:wtimeout") ; + break ; + case T_CACHEFILE : + parse_string(s, word, n, ifile, line, "cache_file", "G:cachefile") ; break ; case T_LISTEN : parse_listen(s, word, n, ifile, line) ; diff --git a/src/include/shibari/dcache.h b/src/include/shibari/dcache.h index 6e0d0ab..50fc44d 100644 --- a/src/include/shibari/dcache.h +++ b/src/include/shibari/dcache.h @@ -35,20 +35,21 @@ struct dcache_s avltree by_key ; avltree by_entry ; avltree by_expire ; + uint64_t max ; uint64_t size ; uint64_t motion ; } ; -#define DCACHE_ZERO { .storage = GENSETDYN_ZERO, .by_key = AVLTREE_ZERO, .by_entry = AVLTREE_ZERO, .by_expire = AVLTREE_ZERO, .size = 0, .motion = 0 } +#define DCACHE_ZERO { .storage = GENSETDYN_ZERO, .by_key = AVLTREE_ZERO, .by_entry = AVLTREE_ZERO, .by_expire = AVLTREE_ZERO, .max = 0, .size = 0, .motion = 0 } extern void dcache_init (dcache_t *, uint64_t) ; extern dcache_node_t *dcache_search (dcache_t *, char const *, uint16_t) ; -extern int dcache_add (dcache_t *, uint64_t, char const *, uint16_t, char const *, uint16_t, tain const *, tain const *) ; -#define dcache_add_g(d, max, key, keylen, data, datalen, expire) dcache_add(d, max, key, keylen, data, datalen, (expire), &STAMP) +extern int dcache_add (dcache_t *, char const *, uint16_t, char const *, uint16_t, tain const *, tain const *) ; +#define dcache_add_g(d, key, keylen, data, datalen, expire) dcache_add(d, key, keylen, data, datalen, (expire), &STAMP) extern void dcache_clean_expired (dcache_t *, tain const *) ; #define dcache_clean_expired_g(d) dcache_clean_expired((d), &STAMP) extern void dcache_free (dcache_t *) ; extern int dcache_save (dcache_t const *, char const *) ; -extern int dcache_load (dcache_t *, uint64_t, char const *) ; +extern int dcache_load (dcache_t *, char const *) ; #endif diff --git a/src/libdcache/dcache_add.c b/src/libdcache/dcache_add.c index 7260726..7e1d871 100644 --- a/src/libdcache/dcache_add.c +++ b/src/libdcache/dcache_add.c @@ -75,11 +75,11 @@ static inline int dcache_add_unbounded (dcache_t *z, char const *key, uint16_t k return 1 ; } -int dcache_add (dcache_t *z, uint64_t max, char const *key, uint16_t keylen, char const *data, uint16_t datalen, tain const *expire, tain const *stamp) +int dcache_add (dcache_t *z, char const *key, uint16_t keylen, char const *data, uint16_t datalen, tain const *expire, tain const *stamp) { uint64_t size = DCACHE_NODE_OVERHEAD + keylen + datalen ; - if (size > max) return (errno = EINVAL, 0) ; - if (z->size > max - size) dcache_clean_expired(z, stamp) ; - if (z->size > max - size) dcache_gc_by_entry(z, max - size) ; + if (size > z->max) return (errno = EINVAL, 0) ; + if (z->size > z->max - size) dcache_clean_expired(z, stamp) ; + if (z->size > z->max - size) dcache_gc_by_entry(z, z->max - size) ; return dcache_add_unbounded(z, key, keylen, data, datalen, expire, stamp) ; } diff --git a/src/libdcache/dcache_init.c b/src/libdcache/dcache_init.c index d42ec62..91c529e 100644 --- a/src/libdcache/dcache_init.c +++ b/src/libdcache/dcache_init.c @@ -50,6 +50,7 @@ void dcache_init (dcache_t *z, uint64_t max) avltree_init(&z->by_key, max >> 9, 3, 8, &key_dtok, &key_cmp, &z->storage) ; avltree_init(&z->by_entry, max >> 9, 3, 8, &entry_dtok, &tain_cmp, &z->storage) ; avltree_init(&z->by_expire, max >> 9, 3, 8, &expire_dtok, &tain_cmp, &z->storage) ; + z->max = max ; z->size = 0 ; z->motion = 0 ; } diff --git a/src/libdcache/dcache_load.c b/src/libdcache/dcache_load.c index d1b9274..176a722 100644 --- a/src/libdcache/dcache_load.c +++ b/src/libdcache/dcache_load.c @@ -14,7 +14,7 @@ #include -static inline int dcache_load_node (dcache_t *z, uint64_t max, buffer *b) +static inline int dcache_load_node (dcache_t *z, buffer *b) { tain entry = { .nano = 0 } ; tain expire = { .nano = 0 } ; @@ -35,12 +35,12 @@ static inline int dcache_load_node (dcache_t *z, uint64_t max, buffer *b) if (!r) return (errno = EPIPE, -1) ; if (r < len) return -1 ; if (blob[len]) return (errno = EPROTO, -1) ; - if (!dcache_add(z, max, blob, keylen, blob + keylen, datalen, &expire, &entry)) return -1 ; + if (!dcache_add(z, blob, keylen, blob + keylen, datalen, &expire, &entry)) return -1 ; } return 1 ; } -static inline int dcache_load_from_buffer (dcache_t *z, uint64_t max, buffer *b) +static inline int dcache_load_from_buffer (dcache_t *z, buffer *b) { { char banner[sizeof(DCACHE_MAGIC) - 1] ; @@ -55,7 +55,7 @@ static inline int dcache_load_from_buffer (dcache_t *z, uint64_t max, buffer *b) } for (;;) { - int r = dcache_load_node(z, max, b) ; + int r = dcache_load_node(z, b) ; if (r < 0) return 0 ; if (!r) break ; } @@ -64,14 +64,14 @@ static inline int dcache_load_from_buffer (dcache_t *z, uint64_t max, buffer *b) #define N 8192 -int dcache_load (dcache_t *z, uint64_t max, char const *file) +int dcache_load (dcache_t *z, char const *file) { char buf[N] ; buffer b ; int fd = open_readb(file) ; if (fd == -1) return 0 ; buffer_init(&b, &buffer_read, fd, buf, N) ; - if (!dcache_load_from_buffer(z, max, &b)) goto err ; + if (!dcache_load_from_buffer(z, &b)) goto err ; fd_close(fd) ; return 1 ; -- cgit v1.2.3