diff options
Diffstat (limited to 'src/rpc/skabus_rpcd_query.c')
-rw-r--r-- | src/rpc/skabus_rpcd_query.c | 207 |
1 files changed, 207 insertions, 0 deletions
diff --git a/src/rpc/skabus_rpcd_query.c b/src/rpc/skabus_rpcd_query.c new file mode 100644 index 0000000..baa6df5 --- /dev/null +++ b/src/rpc/skabus_rpcd_query.c @@ -0,0 +1,207 @@ + /* ISC license. */ + +#include <sys/uio.h> +#include <string.h> +#include <stdint.h> +#include <skalibs/uint32.h> +#include <skalibs/uint64.h> +#include <skalibs/tai.h> +#include <skalibs/strerr2.h> +#include <skalibs/genalloc.h> +#include <skalibs/gensetdyn.h> +#include <skalibs/avltree.h> +#include <skalibs/unixmessage.h> +#include <skabus/rpc.h> +#include "skabus-rpcd.h" + +static void *query_serial_dtok (uint32_t d, void *x) +{ + (void)x ; + return &QUERY(d)->serial ; +} + +static void *query_deadline_dtok (uint32_t d, void *x) +{ + (void)x ; + return &QUERY(d)->deadline ; +} + +static int query_serial_cmp (void const *a, void const *b, void *x) +{ + uint64_t aa = *(uint64_t *)a ; + uint64_t bb = *(uint64_t *)b ; + (void)x ; + return aa < bb ? -1 : aa > bb ; +} + +static int query_deadline_cmp (void const *a, void const *b, void *x) +{ + tain_t const *aa = (tain_t const *)a ; + tain_t const *bb = (tain_t const *)b ; + (void)x ; + return tain_less(aa, bb) ? -1 : tain_less(bb, aa) ; +} + +gensetdyn queries = GENSETDYN_ZERO ; +static avltree qserialdict = AVLTREE_INIT(10, 1, 2, &query_serial_dtok, &query_serial_cmp, 0) ; +static avltree qdeadlinedict = AVLTREE_INIT(10, 1, 2, &query_deadline_dtok, &query_deadline_cmp, 0) ; + +static inline void query_delete (uint32_t i) +{ + if (!avltree_delete(&qdeadlinedict, &QUERY(i)->deadline)) + strerr_diefu1sys(111, "avltree_delete qdeadlinedict in query_delete") ; + if (!avltree_delete(&qserialdict, &QUERY(i)->serial)) + strerr_diefu1sys(111, "avltree_delete qserialdict in query_delete") ; + if (!gensetdyn_delete(&queries, i)) + strerr_diefu1sys(111, "gensetdyn_delete in query_delete") ; +} + +void query_remove (uint32_t i) +{ + query_t *q = QUERY(i) ; + client_t *c = CLIENT(q->client) ; + interface_t *y = INTERFACE(q->interface) ; + if (!gensetdyn_delete(&c->queries, q->clientindex)) + strerr_diefu1sys(111, "gensetdyn_delete c->queries in query_remove") ; + if (!gensetdyn_delete(&y->queries, q->interfaceindex)) + strerr_diefu1sys(111, "gensetdyn_delete y->queries in query_remove") ; + query_delete(i) ; +} + +void query_fail (uint32_t i, char status) +{ + query_t *q = QUERY(i) ; + client_t *c = CLIENT(q->client) ; + char pack[10] = "Rssssssssr" ; + unixmessage_t m = { .s = pack, .len = 10, .fds = 0, .nfds = 0 } ; + uint64_pack_big(pack+1, q->serial) ; + pack[9] = status ; + if (!unixmessage_put(&c->async.out, &m)) + strerr_diefu1sys(111, "unixmessage_put in query_fail") ; + query_remove(i) ; + client_setdeadline(c) ; +} + +int query_cancel (uint32_t i, char reason) +{ + query_t *q = QUERY(i) ; + interface_t *y = INTERFACE(q->interface) ; + client_t *c = CLIENT(y->client) ; + char pack[14] = "Ciiiissssssssr" ; + unixmessage_t m = { .s = pack, .len = 14, .fds = 0, .nfds = 0 } ; + uint32_pack_big(pack+1, y->id) ; + uint64_pack_big(pack+5, q->serial) ; + pack[13] = reason ; + if (!unixmessage_put(&c->async.out, &m)) return 0 ; + client_setdeadline(c) ; + return 1 ; +} + +int query_cancelremove (uint32_t i, char reason) +{ + if (!query_cancel(i, reason)) return 0 ; + query_remove(i) ; + return 1 ; +} + +int query_lookup_by_serial (uint64_t serial, uint32_t *d) +{ + return avltree_search(&qserialdict, &serial, d) ; +} + +int query_lookup_by_mindeadline (uint32_t *d) +{ + return avltree_min(&qdeadlinedict, d) ; +} + +void query_get_mindeadline (tain_t *deadline) +{ + uint32_t d ; + if (query_lookup_by_mindeadline(&d)) *deadline = QUERY(d)->deadline ; + else tain_add_g(deadline, &tain_infinite_relative) ; +} + +int query_add (uint32_t *d, tain_t const *deadline, uint32_t client, uint32_t interface) +{ + static uint64_t serial = 1 ; + uint32_t qq, cc, yy ; + query_t *q ; + if (!gensetdyn_new(&queries, &qq)) return 0 ; + if (!gensetdyn_new(&CLIENT(client)->queries, &cc)) goto end0 ; + if (!gensetdyn_new(&INTERFACE(interface)->queries, &yy)) goto end1 ; + q = QUERY(qq) ; + q->serial = serial ; + q->deadline = *deadline ; + q->client = client ; + q->clientindex = cc ; + q->interface = interface ; + q->interfaceindex = yy ; + if (!avltree_insert(&qserialdict, qq)) goto end2 ; + for (;;) + { + static tain_t const nano1 = { .sec = TAI_ZERO, .nano = 1 } ; + uint32_t d ; + if (!avltree_search(&qdeadlinedict, &q->deadline, &d)) break ; + tain_add(&q->deadline, &q->deadline, &nano1) ; + } + if (!avltree_insert(&qdeadlinedict, qq)) goto end3 ; + serial++ ; + *d = qq ; + return 1 ; + + end3: + if (!avltree_delete(&qserialdict, &serial)) + strerr_diefu1sys(111, "avltree_delete in query_add") ; + end2: + if (!gensetdyn_delete(&INTERFACE(interface)->queries, yy)) + strerr_diefu1sys(111, "gensetdyn_delete INTERFACE(interface)->queries in query_add") ; + end1: + if (!gensetdyn_delete(&CLIENT(client)->queries, cc)) + strerr_diefu1sys(111, "gensetdyn_delete CLIENT(client)->queries in query_add") ; + end0: + if (!gensetdyn_delete(&queries, qq)) + strerr_diefu1sys(111, "gensetdyn_delete queries in query_add") ; + return 0 ; +} + +int query_send (uint32_t qq, unixmessage_t const *m) +{ + skabus_rpc_rinfo_t rinfo ; + char pack[4 + SKABUS_RPC_RINFO_PACK] = "Q" ; + struct iovec v[2] = { { .iov_base = pack, .iov_len = 4 + SKABUS_RPC_RINFO_PACK }, { .iov_base = m->s, .iov_len = m->len } } ; + unixmessage_v_t mtosend = { .v = v, .vlen = 2, .fds = m->fds, .nfds = m->nfds } ; + query_t *q = QUERY(qq) ; + interface_t *y = INTERFACE(q->interface) ; + client_t *c = CLIENT(q->client) ; + char const *idstr = client_idstr(c) ; + size_t idstrlen = strlen(idstr) ; + rinfo.serial = q->serial ; + rinfo.limit = q->deadline ; + tain_copynow(&rinfo.timestamp) ; + rinfo.uid = c->uid ; + rinfo.gid = c->gid ; + memcpy(rinfo.idstr, idstr, idstrlen) ; + memset(rinfo.idstr + idstrlen, 0, SKABUS_RPC_IDSTR_SIZE + 1 - idstrlen) ; + uint32_pack_big(pack+1, y->id) ; + skabus_rpc_rinfo_pack(pack + 5, &rinfo) ; + c = CLIENT(y->client) ; + if (!unixmessage_putv_and_close(&c->async.out, &mtosend, unixmessage_bits_closeall)) return 0 ; + client_setdeadline(c) ; + return 1 ; +} + +void query_reply (uint32_t qq, char result, unixmessage_t const *m) +{ + char pack[11] = "R" ; + struct iovec v[2] = { { .iov_base = pack, .iov_len = 11 }, { .iov_base = m->s, .iov_len = m->len } } ; + unixmessage_v_t mtosend = { .v = v, .vlen = 2, .fds = m->fds, .nfds = m->nfds } ; + query_t *q = QUERY(qq) ; + client_t *c = CLIENT(q->client) ; + uint64_pack_big(pack+1, q->serial) ; + pack[9] = 0 ; + pack[10] = result ; + if (!unixmessage_putv_and_close(&c->async.out, &mtosend, unixmessage_bits_closeall)) + strerr_diefu1sys(111, "unixmessage_put in query_reply") ; + client_setdeadline(c) ; + query_remove(qq) ; +} |