diff options
author | Laurent Bercot <ska-skaware@skarnet.org> | 2017-11-24 18:54:03 +0000 |
---|---|---|
committer | Laurent Bercot <ska-skaware@skarnet.org> | 2017-11-24 18:54:03 +0000 |
commit | 6432cead1941d5305bc2d7f22821ca8a98f43f78 (patch) | |
tree | 1898075255d0e9ac285a22326356c564c1b6c839 /src/rpc | |
parent | 2c5c19923ec56298c4e6b4a71084b27414fd96d6 (diff) | |
download | skabus-6432cead1941d5305bc2d7f22821ca8a98f43f78.tar.xz |
Add skabus-rpc-daemon, skabus-rpcd and the skabus_rpc library
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/PROTOCOL | 82 | ||||
-rw-r--r-- | src/rpc/deps-exe/skabus-rpc-daemon | 1 | ||||
-rw-r--r-- | src/rpc/deps-exe/skabus-rpcd | 8 | ||||
-rw-r--r-- | src/rpc/skabus-rpc-daemon.c | 154 | ||||
-rw-r--r-- | src/rpc/skabus-rpcd.c | 542 | ||||
-rw-r--r-- | src/rpc/skabus-rpcd.h | 146 | ||||
-rw-r--r-- | src/rpc/skabus_rpcd_client.c | 188 | ||||
-rw-r--r-- | src/rpc/skabus_rpcd_interface.c | 113 | ||||
-rw-r--r-- | src/rpc/skabus_rpcd_query.c | 207 |
9 files changed, 1441 insertions, 0 deletions
diff --git a/src/rpc/PROTOCOL b/src/rpc/PROTOCOL new file mode 100644 index 0000000..4022556 --- /dev/null +++ b/src/rpc/PROTOCOL @@ -0,0 +1,82 @@ + + Registering (1st connection) (client -> server) + + 'S' : 1 + pmid : 4 + idlen : 1 + relen : 4 + idstr : idlen + '\0' : 1 + re : relen + '\0' : 1 + + Sending a query (qclient -> server) + + 'Q' : 1 + deadline : TAIN_PACK (12) + contains sec(8), nano(4) + iflen : 1 + ifname : iflen + '\0' : 1 + msg : msglen + + + Sending a query (server -> rclient) + + 'Q' : 1 + ifid : 4 + rinfo: SKABUS_RPC_RINFO_PACK + contains serial(8), deadline(12), timestamp(12), uid(4), gid(4), idstr(SKABUS_RPC_IDSTR_SIZE), '\0'(1) + msg : msglen + + + Sending a reply (rclient -> server) + + 'R' : 1 + serial : 8 + result : 1 + msg : msglen + + + Sending a reply (server -> qclient) + + 'R' : 1 + serial : 8 + status : 1. If not '\0', the rest is ignored. + result : 1 + msg : msglen + + + Cancelling a query (qclient -> server) + + 'C' : 1 + serial : 8 + + + Cancelling a query (server -> rclient) + + 'C' : 1 + ifid : 4 + serial : 8 + reason : 1 + + + Registering an interface (client -> server) + + 'I' : 1 + ifid : 4 + iflen : 1 + relen : 4 + ifname : iflen + '\0' : 1 + re : relen + '\0' : 1 + + + Unregistering an interface (client -> server) + + 'i' : 1 + iflen : 1 + ifname : iflen + '\0' : 1 + diff --git a/src/rpc/deps-exe/skabus-rpc-daemon b/src/rpc/deps-exe/skabus-rpc-daemon new file mode 100644 index 0000000..e7187fe --- /dev/null +++ b/src/rpc/deps-exe/skabus-rpc-daemon @@ -0,0 +1 @@ +-lskarnet diff --git a/src/rpc/deps-exe/skabus-rpcd b/src/rpc/deps-exe/skabus-rpcd new file mode 100644 index 0000000..a4a573f --- /dev/null +++ b/src/rpc/deps-exe/skabus-rpcd @@ -0,0 +1,8 @@ +skabus_rpcd_client.o +skabus_rpcd_interface.o +skabus_rpcd_query.o +libskabus.a.xyzzy +-ls6 +-lskarnet +${TAINNOW_LIB} +${SOCKET_LIB} diff --git a/src/rpc/skabus-rpc-daemon.c b/src/rpc/skabus-rpc-daemon.c new file mode 100644 index 0000000..20c5eea --- /dev/null +++ b/src/rpc/skabus-rpc-daemon.c @@ -0,0 +1,154 @@ +/* ISC license. */ + +#include <sys/types.h> +#include <limits.h> +#include <skalibs/types.h> +#include <skalibs/sgetopt.h> +#include <skalibs/strerr2.h> +#include <skalibs/djbunix.h> +#include <s6/config.h> +#include <skabus/config.h> + +#define USAGE "skabus-rpc-daemon [ -v verbosity ] [ -d | -D ] [ -1 ] [ -c maxconn ] [ -b backlog ] [ -G gid,gid,... ] [ -g gid ] [ -u uid ] [ -U ] [ -t timeout ] [ -T lameducktimeout ] [ -i rulesdir | -x rulesfile ] [ -S | -s ] [ -J | -j ] path" +#define dieusage() strerr_dieusage(100, USAGE) + +int main (int argc, char const *const *argv, char const *const *envp) +{ + unsigned int verbosity = 1 ; + int flag1 = 0 ; + int flagU = 0 ; + int flagreuse = 1 ; + uid_t uid = 0 ; + gid_t gid = 0 ; + gid_t gids[NGROUPS_MAX] ; + size_t gidn = (unsigned int)-1 ; + unsigned int maxconn = 0 ; + unsigned int backlog = (unsigned int)-1 ; + int flagpublic = 0 ; + int flagifpublic = 0 ; + unsigned int timeout = 0 ; + unsigned int ltimeout = 0 ; + char const *rulesdir = 0 ; + char const *rulesfile = 0 ; + PROG = "skabus-rpc-daemon" ; + { + subgetopt_t l = SUBGETOPT_ZERO ; + for (;;) + { + register int opt = subgetopt_r(argc, argv, "Dd1USsJjv:c:b:u:g:G:t:T:i:x:", &l) ; + if (opt == -1) break ; + switch (opt) + { + case 'D' : flagreuse = 0 ; break ; + case 'd' : flagreuse = 1 ; break ; + case '1' : flag1 = 1 ; break ; + case 'v' : if (!uint0_scan(l.arg, &verbosity)) dieusage() ; break ; + case 'c' : if (!uint0_scan(l.arg, &maxconn)) dieusage() ; if (!maxconn) maxconn = 1 ; break ; + case 'b' : if (!uint0_scan(l.arg, &backlog)) dieusage() ; break ; + case 'u' : if (!uid0_scan(l.arg, &uid)) dieusage() ; break ; + case 'g' : if (!gid0_scan(l.arg, &gid)) dieusage() ; break ; + case 'G' : if (!gid_scanlist(gids, NGROUPS_MAX, l.arg, &gidn) && *l.arg) dieusage() ; break ; + case 'U' : flagU = 1 ; uid = 0 ; gid = 0 ; gidn = (size_t)-1 ; break ; + case 'S' : flagpublic = 0 ; break ; + case 's' : flagpublic = 1 ; break ; + case 'J' : flagifpublic = 0 ; break ; + case 'j' : flagifpublic = 1 ; break ; + case 't' : if (!uint0_scan(l.arg, &timeout)) dieusage() ; break ; + case 'T' : if (!uint0_scan(l.arg, <imeout)) dieusage() ; break ; + case 'i' : rulesdir = l.arg ; rulesfile = 0 ; break ; + case 'x' : rulesfile = l.arg ; rulesdir = 0 ; break ; + default : dieusage() ; + } + } + argc -= l.ind ; argv += l.ind ; + if (!argc) dieusage() ; + } + + { + unsigned int m = 0, pos = 0 ; + char const *newargv[30] ; + char fmt[UINT_FMT * 6 + UID_FMT + GID_FMT * (1 + NGROUPS_MAX)] ; + newargv[m++] = S6_EXTBINPREFIX "s6-ipcserver-socketbinder" ; + if (!flagreuse) newargv[m++] = "-D" ; + if (backlog != (unsigned int)-1) + { + newargv[m++] = "-b" ; + newargv[m++] = fmt + pos ; + pos += uint_fmt(fmt + pos, backlog) ; + fmt[pos++] = 0 ; + } + newargv[m++] = "--" ; + newargv[m++] = *argv++ ; + if (flagU || uid || gid || gidn != (size_t)-1) + { + newargv[m++] = S6_EXTBINPREFIX "s6-applyuidgid" ; + if (flagU) newargv[m++] = "-Uz" ; + if (uid) + { + newargv[m++] = "-u" ; + newargv[m++] = fmt + pos ; + pos += uint_fmt(fmt + pos, uid) ; + fmt[pos++] = 0 ; + } + if (gid) + { + newargv[m++] = "-g" ; + newargv[m++] = fmt + pos ; + pos += uint_fmt(fmt + pos, gid) ; + fmt[pos++] = 0 ; + } + if (gidn != (size_t)-1) + { + newargv[m++] = "-G" ; + newargv[m++] = fmt + pos ; + pos += gid_fmtlist(fmt + pos, gids, gidn) ; + fmt[pos++] = 0 ; + } + newargv[m++] = "--" ; + } + newargv[m++] = SKABUS_BINPREFIX "skabus-rpcd" ; + if (verbosity != 1) + { + newargv[m++] = "-v" ; + newargv[m++] = fmt + pos ; + pos += uint_fmt(fmt + pos, verbosity) ; + fmt[pos++] = 0 ; + } + if (flag1) newargv[m++] = "-1" ; + if (flagpublic) newargv[m++] = "-s" ; + if (flagifpublic) newargv[m++] = "-j" ; + if (maxconn) + { + newargv[m++] = "-c" ; + newargv[m++] = fmt + pos ; + pos += uint_fmt(fmt + pos, maxconn) ; + fmt[pos++] = 0 ; + } + if (timeout) + { + newargv[m++] = "-t" ; + newargv[m++] = fmt + pos ; + pos += uint_fmt(fmt + pos, timeout) ; + fmt[pos++] = 0 ; + } + if (ltimeout) + { + newargv[m++] = "-T" ; + newargv[m++] = fmt + pos ; + pos += uint_fmt(fmt + pos, timeout) ; + fmt[pos++] = 0 ; + } + if (rulesdir) + { + newargv[m++] = "-i" ; + newargv[m++] = rulesdir ; + } + else if (rulesfile) + { + newargv[m++] = "-x" ; + newargv[m++] = rulesfile ; + } + newargv[m++] = 0 ; + xpathexec_run(newargv[0], newargv, envp) ; + } +} diff --git a/src/rpc/skabus-rpcd.c b/src/rpc/skabus-rpcd.c new file mode 100644 index 0000000..36c6a78 --- /dev/null +++ b/src/rpc/skabus-rpcd.c @@ -0,0 +1,542 @@ +/* ISC license. */ + +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> +#include <stdint.h> +#include <fcntl.h> +#include <limits.h> +#include <regex.h> +#include <errno.h> +#include <signal.h> +#include <skalibs/uint32.h> +#include <skalibs/uint64.h> +#include <skalibs/types.h> +#include <skalibs/bytestr.h> +#include <skalibs/allreadwrite.h> +#include <skalibs/error.h> +#include <skalibs/cdb.h> +#include <skalibs/strerr2.h> +#include <skalibs/selfpipe.h> +#include <skalibs/sig.h> +#include <skalibs/djbunix.h> +#include <skalibs/sgetopt.h> +#include <skalibs/tai.h> +#include <skalibs/iopause.h> +#include <skalibs/env.h> +#include <skalibs/getpeereid.h> +#include <skalibs/webipc.h> +#include <skalibs/genset.h> +#include <skalibs/unixmessage.h> +#include <s6/accessrules.h> +#include <skabus/rpc.h> +#include "skabus-rpcd.h" + +#define USAGE "skabus-rpcd [ -v verbosity ] [ -1 ] [ -d | -D ] [ -c maxconn ] [ -t timeout ] [ -T lameducktimeout ] [ -i rulesdir | -x rulesfile ] [ -S | -s ] [ -J | -j ]" +#define dieusage() strerr_dieusage(100, USAGE) ; + +tain_t answertto = TAIN_INFINITE_RELATIVE ; + +static unsigned int verbosity = 1 ; +static int cont = 1 ; +static tain_t lameduckdeadline = TAIN_INFINITE_RELATIVE ; + +static unsigned int rulestype = 0 ; +static char const *rules = 0 ; +static int cdbfd = -1 ; +static struct cdb cdbmap = CDB_ZERO ; +static int flagidstrpub = 0, flaginterfacespub = 0 ; + +static inline void handle_signals (void) +{ + for (;;) switch (selfpipe_read()) + { + case -1 : strerr_diefu1sys(111, "selfpipe_read()") ; + case 0 : return ; + case SIGTERM : + { + if (cont) + { + cont = 0 ; + tain_add_g(&lameduckdeadline, &lameduckdeadline) ; + } + break ; + } + case SIGHUP : + { + int fd ; + struct cdb c = CDB_ZERO ; + if (rulestype != 2) break ; + fd = open_readb(rules) ; + if (fd < 0) break ; + if (cdb_init(&c, fd) < 0) + { + fd_close(fd) ; + break ; + } + cdb_free(&cdbmap) ; + fd_close(cdbfd) ; + cdbfd = fd ; + cdbmap = c ; + } + break ; + default : break ; + } +} + +int parse_protocol_async (unixmessage_t const *m, void *p) +{ + uint64_t serial ; + uint32_t qq ; + unixmessage_t mtosend = { .s = m->s + 10, .len = m->len - 10, .fds = m->fds, .nfds = m->nfds } ; + if (m->len < 10 || m->s[0] != 'R') + { + unixmessage_drop(m) ; + return (errno = EPROTO, 0) ; + } + uint64_unpack_big(m->s + 1, &serial) ; + if (!query_lookup_by_serial(serial, &qq)) + { + unixmessage_drop(m) ; + return 1 ; + } + query_reply(qq, m->s[9], &mtosend) ; + return 1 ; +} + +typedef int hlparsefunc_t (uint32_t, unixmessage_t *) ; +typedef hlparsefunc_t *hlparsefunc_t_ref ; + +static int answer (uint32_t cc, char e) +{ + unixmessage_t m = { .s = &e, .len = 1, .fds = 0, .nfds = 0 } ; + client_t *c = CLIENT(cc) ; + if (!unixmessage_put(&c->sync.out, &m)) return 0 ; + if (client_isregistered(cc)) client_setdeadline(c) ; + return 1 ; +} + +static int do_idstr (uint32_t cc, unixmessage_t *m) +{ + uint32_t relen, pmid, yy ; + unsigned char idlen ; + if (m->len < 11 || m->nfds) return (errno = EPROTO, 0) ; + uint32_unpack_big(m->s, &pmid) ; + idlen = m->s[4] ; + uint32_unpack_big(m->s + 5, &relen) ; + if (m->len != 11 + idlen + relen || m->s[9 + idlen] || m->s[10 + idlen + relen]) return (errno = EPROTO, 0) ; + if (client_isregistered(cc)) return answer(cc, EISCONN) ; + if (regexec(&CLIENT(cc)->idstr_re, m->s + 9, 0, 0, 0)) return answer(cc, EPERM) ; + m->s[8] = 0xff ; + if (interface_lookup_by_name(m->s + 8, &yy)) return answer(cc, EADDRINUSE) ; + if (!interface_add(&yy, m->s + 8, idlen + 1, cc, m->s + 10 + idlen, pmid)) + return answer(cc, errno) ; + regfree(&CLIENT(cc)->idstr_re) ; + return answer(cc, 0) ; +} + +static int do_interface_register (uint32_t cc, unixmessage_t *m) +{ + uint32_t id, relen, yy ; + unsigned char iflen ; + if (m->len < 11 || m->nfds) return (errno = EPROTO, 0) ; + uint32_unpack_big(m->s, &id) ; + iflen = m->s[4] ; + uint32_unpack_big(m->s + 5, &relen) ; + if (m->len != 11 + iflen + relen) return (errno = EPROTO, 0) ; + if (m->s[9 + iflen] || m->s[10 + iflen + relen]) return (errno = EPROTO, 0) ; + if (!client_isregistered(cc)) return answer(cc, ENOTCONN) ; + if (regexec(&CLIENT(cc)->interfaces_re, m->s + 9, 0, 0, 0)) return answer(cc, EPERM) ; + if (interface_lookup_by_name(m->s + 9, &yy)) return answer(cc, EADDRINUSE) ; + if (!interface_add(&yy, m->s + 9, iflen, cc, m->s + 10 + iflen, id)) return answer(cc, errno) ; + return answer(cc, 0) ; +} + +static int do_interface_unregister (uint32_t cc, unixmessage_t *m) +{ + uint32_t yy ; + unsigned char iflen ; + if (!m->len || m->nfds) return (errno = EPROTO, 0) ; + iflen = m->s[0] ; + if ((m->len != iflen + 2) || m->s[iflen+1]) return (errno = EPROTO, 0) ; + if (!client_isregistered(cc)) return answer(cc, ENOTCONN) ; + if (!iflen || ((unsigned char const *)m->s)[1] == 0xff) return answer(cc, EINVAL) ; + if (!interface_lookup_by_name(m->s + 1, &yy)) return answer(cc, errno) ; + if (INTERFACE(yy)->client != cc) return answer(cc, EPERM) ; + interface_remove(yy) ; + return answer(cc, 0) ; +} + +static int do_query_send (uint32_t cc, unixmessage_t *m) +{ + tain_t limit ; + uint32_t yy, qq ; + unsigned char iflen ; + int e ; + if (m->len < 2 + TAIN_PACK) return (errno = EPROTO, 0) ; + tain_unpack(m->s, &limit) ; + iflen = m->s[TAIN_PACK] ; + if (m->len < 2 + TAIN_PACK + iflen || m->s[TAIN_PACK + 1 + iflen]) return (errno = EPROTO, 0) ; + if (!client_isregistered(cc)) { e = ENOTCONN ; goto nope ; } + if (!interface_lookup_by_name(m->s + TAIN_PACK + 1, &yy)) { e = ESRCH ; goto nope ; } + if (regexec(&INTERFACE(yy)->re, client_idstr(CLIENT(cc)), 0, 0, 0)) { e = EPERM ; goto nope ; } + m->s += 2 + TAIN_PACK + iflen ; + m->len -= 2 + TAIN_PACK + iflen ; + if (!query_add(&qq, &limit, cc, yy) || !query_send(qq, m)) { e = errno ; goto nope ; } + return answer(cc, 0) ; + + nope: + if (!answer(cc, e)) return 0 ; + unixmessage_drop(m) ; + return 1 ; +} + +static int do_query_cancel (uint32_t cc, unixmessage_t *m) +{ + uint64_t serial ; + uint32_t qq ; + if (m->len != 8 || m->nfds) return (errno = EPROTO, 0) ; + if (!client_isregistered(cc)) return answer(cc, ENOTCONN) ; + uint64_unpack_big(m->s, &serial) ; + if (!query_lookup_by_serial(serial, &qq)) return answer(cc, errno) ; + if (cc != QUERY(qq)->client) return answer(cc, EPERM) ; + if (!query_cancelremove(qq, 0)) return answer(cc, errno) ; + return answer(cc, 0) ; +} + +static int do_error (uint32_t cc, unixmessage_t *m) +{ + (void)cc ; + (void)m ; + return (errno = EPROTO, 0) ; +} + +int parse_protocol_sync (unixmessage_t const *m, void *p) +{ + static hlparsefunc_t_ref const f[6] = + { + &do_idstr, + &do_interface_register, + &do_interface_unregister, + &do_query_send, + &do_query_cancel, + &do_error + } ; + unixmessage_t mcopy = { .s = m->s + 1, .len = m->len -1, .fds = m->fds, .nfds = m->nfds } ; + if (!m->len) + { + unixmessage_drop(m) ; + return (errno = EPROTO, 0) ; + } + if (!(*f[byte_chr("SIiQC", 5, m->s[0])])(*(uint32_t *)p, &mcopy)) + { + unixmessage_drop(m) ; + return 0 ; + } + return 1 ; +} + +static void removeclient (uint32_t *i, uint32_t j) +{ + if (verbosity >= 2) + { + char fmt[UINT32_FMT] ; + fmt[uint32_fmt(fmt, *i)] = 0 ; + strerr_warni2sys("removing client ", fmt) ; + } + client_remove(*i, j) ; + *i = j ; +} + +static int makere (regex_t *re, char const *s, char const *var) +{ + size_t varlen = strlen(var) ; + if (str_start(s, var) && (s[varlen] == '=')) + { + int r = regcomp(re, s + varlen + 1, REG_EXTENDED | REG_NOSUB) ; + if (r) + { + if (verbosity) + { + char buf[256] ; + regerror(r, re, buf, 256) ; + strerr_warnw6x("invalid ", var, " value: ", s + varlen + 1, ": ", buf) ; + } + return -1 ; + } + else return 1 ; + } + return 0 ; +} + +static void defaultre (regex_t *re, int flag) +{ + char const *s = flag ? ".*" : ".^" ; + int r = regcomp(re, s, REG_EXTENDED | REG_NOSUB) ; + if (r) + { + char buf[256] ; + regerror(r, re, buf, 256) ; + strerr_diefu4x(100, "compile ", s, " into a regular expression: ", buf) ; + } +} + +static inline int parse_env (char const *const *envp, regex_t *idstr_re, regex_t *interfaces_re, uint32_t *flags) +{ + uint32_t fl = 0 ; + int idstr_done = 0, interfaces_done = 0 ; + for (; *envp ; envp++) + { + if (str_start(*envp, "SKABUS_RPC_QSENDFDS=")) fl |= 1 ; + if (str_start(*envp, "SKABUS_RPC_RSENDFDS=")) fl |= 2 ; + if (!idstr_done) + { + idstr_done = makere(idstr_re, *envp, "SKABUS_RPC_ID_REGEX") ; + if (idstr_done < 0) + { + if (interfaces_done) regfree(interfaces_re) ; + return 0 ; + } + } + if (!interfaces_done) + { + interfaces_done = makere(interfaces_re, *envp, "SKABUS_RPC_INTERFACES_REGEX") ; + if (interfaces_done < 0) + { + if (idstr_done) regfree(idstr_re) ; + return 0 ; + } + } + if (idstr_done && interfaces_done) return 1 ; + } + if (!idstr_done) defaultre(idstr_re, flagidstrpub) ; + if (!interfaces_done) defaultre(interfaces_re, flaginterfacespub) ; + *flags = fl ; + return 1 ; +} + +static inline int new_connection (int fd, uid_t *uid, gid_t *gid, regex_t *idstr_re, regex_t *interfaces_re, uint32_t *flags) +{ + s6_accessrules_params_t params = S6_ACCESSRULES_PARAMS_ZERO ; + s6_accessrules_result_t result = S6_ACCESSRULES_ERROR ; + + if (getpeereid(fd, uid, gid) < 0) + { + if (verbosity) strerr_warnwu1sys("getpeereid") ; + return 0 ; + } + + switch (rulestype) + { + case 0 : + result = S6_ACCESSRULES_ALLOW ; break ; + case 1 : + result = s6_accessrules_uidgid_fs(*uid, *gid, rules, ¶ms) ; break ; + case 2 : + result = s6_accessrules_uidgid_cdb(*uid, *gid, &cdbmap, ¶ms) ; break ; + default : break ; + } + if (result != S6_ACCESSRULES_ALLOW) + { + if (verbosity && (result == S6_ACCESSRULES_ERROR)) + strerr_warnw1sys("error while checking rules") ; + return 0 ; + } + if (params.exec.s) + { + stralloc_free(¶ms.exec) ; + if (verbosity) + { + char fmtuid[UID_FMT] ; + char fmtgid[GID_FMT] ; + fmtuid[uid_fmt(fmtuid, *uid)] = 0 ; + fmtgid[gid_fmt(fmtgid, *gid)] = 0 ; + strerr_warnw4x("unused exec string in rules for uid ", fmtuid, " gid ", fmtgid) ; + } + } + if (params.env.s) + { + size_t n = byte_count(params.env.s, params.env.len, '\0') ; + char const *envp[n+1] ; + if (!env_make(envp, n, params.env.s, params.env.len)) + { + if (verbosity) strerr_warnwu1sys("env_make") ; + stralloc_free(¶ms.env) ; + return 0 ; + } + envp[n] = 0 ; + if (!parse_env(envp, idstr_re, interfaces_re, flags)) + { + if (verbosity) strerr_warnwu1sys("parse_env") ; + s6_accessrules_params_free(¶ms) ; + return 0 ; + } + s6_accessrules_params_free(¶ms) ; + } + return 1 ; +} + +int main (int argc, char const *const *argv, char const *const *envp) +{ + int spfd ; + int flag1 = 0 ; + uint32_t maxconn = 64 ; + PROG = "skabus-rpcd" ; + + { + subgetopt_t l = SUBGETOPT_ZERO ; + unsigned int t = 0, T = 0 ; + for (;;) + { + int opt = subgetopt_r(argc, argv, "v:SsJj1i:x:t:T:c:", &l) ; + if (opt == -1) break ; + switch (opt) + { + case 'v' : if (!uint0_scan(l.arg, &verbosity)) dieusage() ; break ; + case 'S' : flagidstrpub = 0 ; break ; + case 's' : flagidstrpub = 1 ; break ; + case 'J' : flaginterfacespub = 0 ; break ; + case 'j' : flaginterfacespub = 1 ; break ; + case '1' : flag1 = 1 ; break ; + case 'i' : rules = l.arg ; rulestype = 1 ; break ; + case 'x' : rules = l.arg ; rulestype = 2 ; break ; + case 't' : if (!uint0_scan(l.arg, &t)) dieusage() ; break ; + case 'T' : if (!uint0_scan(l.arg, &T)) dieusage() ; break ; + case 'c' : if (!uint320_scan(l.arg, &maxconn)) dieusage() ; break ; + default : dieusage() ; + } + } + argc -= l.ind ; argv += l.ind ; + if (t) tain_from_millisecs(&answertto, t) ; + if (T) tain_from_millisecs(&lameduckdeadline, T) ; + } + if (maxconn > SKABUS_RPC_MAX) maxconn = SKABUS_RPC_MAX ; + if (!maxconn) maxconn = 1 ; + { + struct stat st ; + if (fstat(0, &st) < 0) strerr_diefu1sys(111, "fstat stdin") ; + if (!S_ISSOCK(st.st_mode)) strerr_dief1x(100, "stdin is not a socket") ; + } + if (flag1) + { + if (fcntl(1, F_GETFD) < 0) + strerr_dief1sys(100, "called with option -1 but stdout said") ; + } + else close(1) ; + spfd = selfpipe_init() ; + if (spfd < 0) strerr_diefu1sys(111, "selfpipe_init") ; + if (sig_ignore(SIGPIPE) < 0) strerr_diefu1sys(111, "ignore SIGPIPE") ; + { + sigset_t set ; + sigemptyset(&set) ; + sigaddset(&set, SIGTERM) ; + sigaddset(&set, SIGHUP) ; + if (selfpipe_trapset(&set) < 0) strerr_diefu1sys(111, "trap signals") ; + } + + if (rulestype == 2) + { + cdbfd = open_readb(rules) ; + if (cdbfd < 0) strerr_diefu3sys(111, "open ", rules, " for reading") ; + if (cdb_init(&cdbmap, cdbfd) < 0) + strerr_diefu2sys(111, "cdb_init ", rules) ; + } + + { + GENSETB_TYPE(client_t, 1+maxconn) blob ; + iopause_fd x[2 + (maxconn << 1)] ; + GENSETB_init(client_t, &blob, 1+maxconn) ; + sentinel = gensetb_new(&blob) ; + blob.storage[sentinel].next = sentinel ; + clients = &blob.info ; + x[0].fd = spfd ; x[0].events = IOPAUSE_READ ; + x[1].fd = 0 ; + + if (flag1) + { + fd_write(1, "\n", 1) ; + fd_close(1) ; + } + tain_now_g() ; + + for (;;) + { + tain_t deadline ; + int r = 1 ; + uint32_t i = blob.storage[sentinel].next, j = 2 ; + query_get_mindeadline(&deadline) ; + if (!cont && tain_less(&lameduckdeadline, &deadline)) deadline = lameduckdeadline ; + if (queries_pending()) r = 0 ; + + x[1].events = (cont && (numconn < maxconn)) ? IOPAUSE_READ : 0 ; + for (; i != sentinel ; i = blob.storage[i].next) + if (client_prepare_iopause(i, &deadline, x, &j, cont)) r = 0 ; + if (!cont && r) break ; + + r = iopause_g(x, j, &deadline) ; + if (r < 0) strerr_diefu1sys(111, "iopause") ; + + + /* Timeout */ + + if (!r) + { + if (!cont && !tain_future(&lameduckdeadline)) return 1 ; + for (;;) + { + if (!query_lookup_by_mindeadline(&i)) break ; + if (tain_future(&QUERY(i)->deadline)) break ; + query_fail(i, ETIMEDOUT) ; + } + errno = ETIMEDOUT ; + for (i = blob.storage[sentinel].next, j = sentinel ; i != sentinel ; j = i, i = blob.storage[i].next) + if (!tain_future(&blob.storage[i].deadline)) removeclient(&i, j) ; + continue ; + } + + + /* Signal */ + + if (x[0].revents & IOPAUSE_READ) handle_signals() ; + + + /* Event */ + + for (j = sentinel, i = blob.storage[sentinel].next ; i != sentinel ; j = i, i = blob.storage[i].next) + if (!client_flush(i, x)) removeclient(&i, j) ; + + for (j = sentinel, i = blob.storage[sentinel].next ; i != sentinel ; j = i, i = blob.storage[i].next) + switch(client_read(i, x)) + { + case 0 : errno = 0 ; + case -1 : + case -2 : + { + removeclient(&i, j) ; + break ; + } + case 1 : break ; + default : X() ; + } + + + /* New connection */ + + if (x[1].revents & IOPAUSE_READ) + { + uint32_t flags = 0 ; + uid_t uid ; + gid_t gid ; + regex_t idstr_re, interfaces_re ; + int fd = ipc_accept_nb(x[1].fd, 0, 0, 0) ; + if (fd < 0) + if (!error_isagain(errno)) strerr_diefu1sys(111, "accept") ; + else continue ; + else if (!new_connection(fd, &uid, &gid, &idstr_re, &interfaces_re, &flags)) + fd_close(fd) ; + else client_add(&i, &idstr_re, &interfaces_re, uid, gid, fd, flags) ; + } + } + } + return 0 ; +} diff --git a/src/rpc/skabus-rpcd.h b/src/rpc/skabus-rpcd.h new file mode 100644 index 0000000..fec72f1 --- /dev/null +++ b/src/rpc/skabus-rpcd.h @@ -0,0 +1,146 @@ +/* ISC license. */ + +#ifndef SKABUS_RPCD_H +#define SKABUS_RPCD_H + +#include <sys/types.h> +#include <stdint.h> +#include <regex.h> +#include <skalibs/uint64.h> +#include <skalibs/tai.h> +#include <skalibs/iopause.h> +#include <skalibs/genalloc.h> +#include <skalibs/genset.h> +#include <skalibs/gensetdyn.h> +#include <skalibs/unixmessage.h> +#include <skalibs/unixconnection.h> +#include <skabus/rpc.h> + +#define X() strerr_dief1x(101, "unexpected error - please submit a bug-report.") ; + + + /* + query: queries accepted from client, sent to interface + The list is stored in a gensetdyn. + Looked up by serial for answers or cancels. + */ + +typedef struct query_s query_t, *query_t_ref ; +struct query_s +{ + uint64_t serial ; + tain_t deadline ; + uint32_t client ; + uint32_t clientindex ; + uint32_t interface ; + uint32_t interfaceindex ; +} ; + +#define QUERY_ZERO \ +{ \ + .serial = 0, \ + .deadline = TAIN_ZERO, \ + .client = 0, \ + .clientindex = 0, \ + .interface = 0, \ + .interfaceindex = 0 \ +} + + + /* + interfaces: registered R interfaces. + The list is stored in a gensetdyn. + Looked up by name. + */ + +typedef struct interface_s interface_t, *interface_t_ref ; +struct interface_s +{ + char name[SKABUS_RPC_INTERFACE_MAXLEN+1] ; + regex_t re ; /* clients who can access that interface */ + uint32_t id ; + uint32_t client ; + uint32_t index ; /* in the owner's interfaces list */ + gensetdyn queries ; /* uint32_t */ +} ; +#define INTERFACE_ZERO { .name = "", .id = 0, .client = 0, .index = 0, .queries = GENSETDYN_ZERO } + + + /* + client: client connections. + The list is stored in a genset. + List browsed at every iopause iteration, so needs a next field. + */ + +typedef struct client_s client_t, *client_t_ref ; +struct client_s +{ + uint32_t next ; + uid_t uid ; + gid_t gid ; + tain_t deadline ; + genalloc interfaces ; /* uint32_t */ + gensetdyn queries ; /* uint32_t */ + unixconnection_t sync ; + unixconnection_t async ; + uint32_t xindex[2] ; + regex_t idstr_re ; + regex_t interfaces_re ; +} ; +#define CLIENT_ZERO \ +{ \ + .next = 0, \ + .uid = (uid_t)-1, \ + .gid = (gid_t)-1, \ + .deadline = TAIN_ZERO, \ + .interfaces = GENALLOC_ZERO, \ + .queries = GENALLOC_ZERO, \ + .sync = UNIXCONNECTION_ZERO, \ + .async = UNIXCONNECTION_ZERO, \ + .xindex = { 0, 0 }, \ +} + +extern gensetdyn queries ; +#define QUERY(i) GENSETDYN_P(query_t, &queries, (i)) +#define queries_pending() gensetdyn_n(&queries) + +extern gensetdyn interfaces ; +#define INTERFACE(i) GENSETDYN_P(interface_t, &interfaces, (i)) + +extern genset *clients ; +extern unsigned int sentinel ; +#define CLIENT(i) genset_p(client_t, clients, (i)) +#define numconn (genset_n(clients) - 1) + +extern void query_remove (uint32_t) ; +extern void query_fail (uint32_t, char) ; +extern int query_cancel (uint32_t, char) ; +extern int query_cancelremove (uint32_t, char) ; +extern int query_lookup_by_serial (uint64_t, uint32_t *) ; +extern int query_lookup_by_mindeadline (uint32_t *) ; +extern void query_get_mindeadline (tain_t *) ; +extern int query_add (uint32_t *, tain_t const *, uint32_t, uint32_t) ; +extern int query_send (uint32_t, unixmessage_t const *) ; +extern int query_sendpm (uint32_t, unixmessage_t const *) ; +extern void query_reply (uint32_t, char, unixmessage_t const *) ; + +extern void interface_remove (uint32_t) ; +extern int interface_lookup_by_name (char const *, uint32_t *) ; +extern int interface_add (uint32_t *, char const *, size_t, uint32_t, char const *, uint32_t) ; + +#define client_isregistered(cc) genalloc_len(uint32_t, &CLIENT(cc)->interfaces) +#define client_idstr(c) (INTERFACE(genalloc_s(uint32_t, &(c)->interfaces)[0])->name + 1) +extern void client_remove (uint32_t, uint32_t) ; +extern void client_add (uint32_t *, regex_t const *, regex_t const *, uid_t, gid_t, int, uint32_t) ; +extern void client_nextdeadline (uint32_t, tain_t *) ; +extern void client_setdeadline (client_t *) ; +extern int client_prepare_iopause (uint32_t, tain_t *, iopause_fd *, uint32_t *, int) ; +extern int client_flush (uint32_t, iopause_fd const *) ; +extern int client_read (uint32_t, iopause_fd const *) ; + +extern int parse_protocol_sync (unixmessage_t const *, void *) ; +extern int parse_protocol_async (unixmessage_t const *, void *) ; + +extern tain_t answertto ; + +#endif diff --git a/src/rpc/skabus_rpcd_client.c b/src/rpc/skabus_rpcd_client.c new file mode 100644 index 0000000..deac459 --- /dev/null +++ b/src/rpc/skabus_rpcd_client.c @@ -0,0 +1,188 @@ +/* ISC license. */ + +#include <sys/types.h> +#include <stdint.h> +#include <errno.h> +#include <regex.h> +#include <skalibs/djbunix.h> +#include <skalibs/error.h> +#include <skalibs/strerr2.h> +#include <skalibs/tai.h> +#include <skalibs/iopause.h> +#include <skalibs/genalloc.h> +#include <skalibs/genset.h> +#include <skalibs/gensetdyn.h> +#include <skalibs/unixmessage.h> +#include <skalibs/unixconnection.h> +#include <skalibs/skaclient.h> +#include <skabus/rpc.h> +#include "skabus-rpcd.h" + +static inline void client_free (client_t *c) +{ + if (!genalloc_len(uint32_t, &c->interfaces)) regfree(&c->idstr_re) ; + regfree(&c->interfaces_re) ; + genalloc_free(unsigned int, &c->interfaces) ; + gensetdyn_free(&c->queries) ; + fd_close(unixmessage_sender_fd(&c->sync.out)) ; + unixconnection_free(&c->sync) ; + if (unixmessage_sender_fd(&c->async.out) >= 0) + { + fd_close(unixmessage_sender_fd(&c->async.out)) ; + unixconnection_free(&c->async) ; + } +} + +genset *clients ; +unsigned int sentinel ; + +static inline void client_delete (uint32_t i, uint32_t prev) +{ + CLIENT(prev)->next = CLIENT(i)->next ; + client_free(CLIENT(i)) ; + genset_delete(clients, i) ; +} + +static int query_cancelremove_iter (char *s, void *reason) +{ + uint32_t i = *(uint32_t *)s ; + return query_cancelremove(i, *(char *)reason) ; +} + +void client_remove (uint32_t i, uint32_t prev) +{ + client_t *c = CLIENT(i) ; + char reason = ECONNABORTED ; + if (gensetdyn_iter(&c->queries, &query_cancelremove_iter, &reason) < gensetdyn_n(&c->queries)) + strerr_diefu1sys(111, "query_cancelremove_iter in client_remove") ; + while (genalloc_len(uint32_t, &c->interfaces)) + interface_remove(genalloc_s(uint32_t, &c->interfaces)[genalloc_len(uint32_t, &c->interfaces) - 1]) ; + client_delete(i, prev) ; +} + +void client_setdeadline (client_t *c) +{ + tain_t blah ; + tain_half(&blah, &tain_infinite_relative) ; + tain_add_g(&blah, &blah) ; + if (tain_less(&blah, &c->deadline)) tain_add_g(&c->deadline, &answertto) ; +} + +void client_add (uint32_t *d, regex_t const *idstr_re, regex_t const *interfaces_re, uid_t uid, gid_t gid, int fdsock, uint32_t flags) +{ + uint32_t cc = genset_new(clients) ; + client_t *c = CLIENT(cc) ; + c->next = CLIENT(sentinel)->next ; + c->uid = uid ; + c->gid = gid ; + tain_add_g(&c->deadline, &answertto) ; + c->interfaces = genalloc_zero ; + c->queries = gensetdyn_zero ; + c->idstr_re = *idstr_re ; + c->interfaces_re = *interfaces_re ; + unixconnection_init(&c->sync, fdsock, fdsock) ; + unixconnection_init(&c->async, -1, -1) ; + c->async.out.fd = -(int)flags-1 ; + CLIENT(sentinel)->next = cc ; + *d = cc ; +} + +void client_nextdeadline (uint32_t i, tain_t *deadline) +{ + client_t *c = CLIENT(i) ; + if (tain_less(&c->deadline, deadline)) *deadline = c->deadline ; +} + +int client_prepare_iopause (uint32_t cc, tain_t *deadline, iopause_fd *x, uint32_t *j, int notlameduck) +{ + client_t *c = CLIENT(cc) ; + int inflight = 0 ; + uint32_t i = genalloc_len(uint32_t, &c->interfaces) ; + if (tain_less(&c->deadline, deadline)) *deadline = c->deadline ; + if (!unixmessage_sender_isempty(&c->sync.out) | !unixmessage_receiver_isempty(&c->sync.in) || (notlameduck && !unixmessage_receiver_isfull(&c->sync.in))) + { + x[*j].fd = unixmessage_sender_fd(&c->sync.out) ; + x[*j].events = ((!unixmessage_receiver_isempty(&c->sync.in) || (notlameduck && !unixmessage_receiver_isfull(&c->sync.in))) ? IOPAUSE_READ : 0) + | (!unixmessage_sender_isempty(&c->sync.out) ? IOPAUSE_WRITE : 0) ; + c->xindex[0] = (*j)++ ; + } + else c->xindex[0] = 0 ; + while (i--) + { + interface_t *y = INTERFACE(genalloc_s(uint32_t, &c->interfaces)[i]) ; + if (gensetdyn_n(&y->queries)) + { + inflight = 1 ; + break ; + } + } + if (!unixmessage_sender_isempty(&c->async.out) || !unixmessage_receiver_isempty(&c->async.in) || inflight) + { + x[*j].fd = unixmessage_sender_fd(&c->async.out) ; + x[*j].events = (unixmessage_sender_isempty(&c->async.out) ? IOPAUSE_WRITE : 0) + | (!unixmessage_receiver_isempty(&c->async.in) || inflight ? IOPAUSE_READ : 0) ; + c->xindex[1] = (*j)++ ; + } + else c->xindex[1] = 0 ; + return c->xindex[0] || c->xindex[1] ; +} + +int client_flush (uint32_t i, iopause_fd const *x) +{ + client_t *c = CLIENT(i) ; + int isflushed = 2 ; + if (c->xindex[0] && (x[c->xindex[0]].revents & IOPAUSE_WRITE)) + { + if (!unixmessage_sender_flush(&c->sync.out)) + if (!error_isagain(errno)) return 0 ; + else isflushed = 0 ; + else isflushed = 1 ; + } + + if (c->xindex[1] && (x[c->xindex[1]].revents & IOPAUSE_WRITE)) + { + if (!unixmessage_sender_flush(&c->async.out)) + if (!error_isagain(errno)) return 0 ; + else isflushed = 0 ; + else isflushed = !!isflushed ; + } + + if (isflushed == 1) tain_add_g(&c->deadline, &tain_infinite_relative) ; + return 1 ; +} + +int client_read (uint32_t cc, iopause_fd const *x) +{ + client_t *c = CLIENT(cc) ; + if (!unixmessage_receiver_isempty(&c->sync.in) || (c->xindex[0] && x[c->xindex[0]].revents & IOPAUSE_READ)) + { + if (unixmessage_sender_fd(&c->async.out) < 0) + { + unixmessage_t m ; + int r = unixmessage_receive(&c->sync.in, &m) ; + if (r < 0) return -1 ; + if (r) + { + uint32_t flags = -(unixmessage_sender_fd(&c->async.out) + 1) ; + if (!skaclient_server_bidi_ack(&m, &c->sync.out, &c->async.out, &c->async.in, c->async.mainbuf, UNIXMESSAGE_BUFSIZE, c->async.auxbuf, UNIXMESSAGE_AUXBUFSIZE, SKABUS_RPC_BANNER1, SKABUS_RPC_BANNER1_LEN, SKABUS_RPC_BANNER2, SKABUS_RPC_BANNER2_LEN)) + { + unixmessage_drop(&m) ; + return -1 ; + } + if (!(flags & 1)) unixmessage_receiver_refuse_fds(&c->sync.in) ; + if (!(flags & 2)) unixmessage_receiver_refuse_fds(&c->async.in) ; + } + } + else + { + int r = unixmessage_handle(&c->sync.in, &parse_protocol_sync, &cc) ; + if (r <= 0) return r ; + } + } + if (!unixmessage_receiver_isempty(&c->async.in) || (c->xindex[1] && x[c->xindex[1]].revents & IOPAUSE_READ)) + { + int r = unixmessage_handle(&c->async.in, &parse_protocol_async, &cc) ; + if (r <= 0) return r ; + } + return 1 ; +} diff --git a/src/rpc/skabus_rpcd_interface.c b/src/rpc/skabus_rpcd_interface.c new file mode 100644 index 0000000..458b2d8 --- /dev/null +++ b/src/rpc/skabus_rpcd_interface.c @@ -0,0 +1,113 @@ +/* ISC license. */ + +#include <string.h> +#include <stdint.h> +#include <errno.h> +#include <regex.h> +#include <skalibs/strerr2.h> +#include <skalibs/genalloc.h> +#include <skalibs/gensetdyn.h> +#include <skalibs/avltree.h> +#include <skabus/rpc.h> +#include "skabus-rpcd.h" + +static inline void interface_free (interface_t *p) +{ + p->name[0] = p->name[1] = 0 ; + gensetdyn_free(&p->queries) ; +} + +static void *if_dtok (uint32_t d, void *x) +{ + (void)x ; + return INTERFACE(d)->name ; +} + +static int if_cmp (void const *a, void const *b, void *x) +{ + (void)x ; + return strncmp((char const *)a, (char const *)b, SKABUS_RPC_INTERFACE_MAXLEN) ; +} + +gensetdyn interfaces = GENSETDYN_ZERO ; +static avltree ifdict = AVLTREE_INIT(2, 3, 8, &if_dtok, &if_cmp, 0) ; + +static inline void interface_delete (uint32_t i) +{ + interface_t *y = INTERFACE(i) ; + if (!avltree_delete(&ifdict, y->name)) + strerr_diefu1sys(111, "avltree_delete in interface_delete") ; + interface_free(y) ; + if (!gensetdyn_delete(&interfaces, i)) + strerr_diefu1sys(111, "gensetdyn_delete in interface_delete") ; +} + +static int query_fail_iter (char *s, void *reason) +{ + uint32_t i = *(uint32_t *)s ; + query_fail(i, *(char *)reason) ; + return 1 ; +} + +static inline void client_interfacemove (client_t *c, uint32_t from, uint32_t to) +{ + uint32_t *ifaces = genalloc_s(uint32_t, &c->interfaces) ; + INTERFACE(ifaces[from])->index = to ; + ifaces[to] = ifaces[from] ; +} + +void interface_remove (uint32_t i) + { + interface_t *y = INTERFACE(i) ; + client_t *c = CLIENT(y->client) ; + uint32_t n = gensetdyn_n(&y->queries) ; + char reason = ECONNRESET ; + gensetdyn_iter(&y->queries, &query_fail_iter, &reason) ; + n = genalloc_len(uint32_t, &c->interfaces) ; + client_interfacemove(c, n-1, y->index) ; + genalloc_setlen(uint32_t, &c->interfaces, n-1) ; + interface_delete(i) ; +} + +int interface_lookup_by_name (char const *s, uint32_t *d) +{ + return avltree_search(&ifdict, s, d) ; +} + +int interface_add (uint32_t *d, char const *name, size_t namelen, uint32_t client, char const *re, uint32_t id) +{ + uint32_t yy ; + int e ; + genalloc *g = &CLIENT(client)->interfaces ; + if (!genalloc_readyplus(uint32_t, g, 1)) return 0 ; + if (!gensetdyn_new(&interfaces, &yy)) return 0 ; + { + interface_t *y = INTERFACE(yy) ; + int r = regcomp(&y->re, re, REG_EXTENDED | REG_NOSUB) ; + if (r) + { + e = r == REG_ESPACE ? ENOMEM : EINVAL ; + goto err ; + } + memcpy(y->name, name, namelen) ; y->name[namelen] = 0 ; + y->id = id ; + y->client = client ; + y->index = genalloc_len(uint32_t, g) - 1 ; + y->queries = gensetdyn_zero ; + if (!avltree_insert(&ifdict, yy)) + { + e = errno ; + regfree(&y->re) ; + goto err ; + } + } + genalloc_append(uint32_t, g, &yy) ; + *d = yy ; + return 1 ; + + err: + if (!gensetdyn_delete(&interfaces, yy)) + strerr_diefu1sys(111, "gensetdyn_delete in interface_add") ; + errno = e ; + return 0 ; +} diff --git a/src/rpc/skabus_rpcd_query.c b/src/rpc/skabus_rpcd_query.c new file mode 100644 index 0000000..baa6df5 --- /dev/null +++ b/src/rpc/skabus_rpcd_query.c @@ -0,0 +1,207 @@ + /* ISC license. */ + +#include <sys/uio.h> +#include <string.h> +#include <stdint.h> +#include <skalibs/uint32.h> +#include <skalibs/uint64.h> +#include <skalibs/tai.h> +#include <skalibs/strerr2.h> +#include <skalibs/genalloc.h> +#include <skalibs/gensetdyn.h> +#include <skalibs/avltree.h> +#include <skalibs/unixmessage.h> +#include <skabus/rpc.h> +#include "skabus-rpcd.h" + +static void *query_serial_dtok (uint32_t d, void *x) +{ + (void)x ; + return &QUERY(d)->serial ; +} + +static void *query_deadline_dtok (uint32_t d, void *x) +{ + (void)x ; + return &QUERY(d)->deadline ; +} + +static int query_serial_cmp (void const *a, void const *b, void *x) +{ + uint64_t aa = *(uint64_t *)a ; + uint64_t bb = *(uint64_t *)b ; + (void)x ; + return aa < bb ? -1 : aa > bb ; +} + +static int query_deadline_cmp (void const *a, void const *b, void *x) +{ + tain_t const *aa = (tain_t const *)a ; + tain_t const *bb = (tain_t const *)b ; + (void)x ; + return tain_less(aa, bb) ? -1 : tain_less(bb, aa) ; +} + +gensetdyn queries = GENSETDYN_ZERO ; +static avltree qserialdict = AVLTREE_INIT(10, 1, 2, &query_serial_dtok, &query_serial_cmp, 0) ; +static avltree qdeadlinedict = AVLTREE_INIT(10, 1, 2, &query_deadline_dtok, &query_deadline_cmp, 0) ; + +static inline void query_delete (uint32_t i) +{ + if (!avltree_delete(&qdeadlinedict, &QUERY(i)->deadline)) + strerr_diefu1sys(111, "avltree_delete qdeadlinedict in query_delete") ; + if (!avltree_delete(&qserialdict, &QUERY(i)->serial)) + strerr_diefu1sys(111, "avltree_delete qserialdict in query_delete") ; + if (!gensetdyn_delete(&queries, i)) + strerr_diefu1sys(111, "gensetdyn_delete in query_delete") ; +} + +void query_remove (uint32_t i) +{ + query_t *q = QUERY(i) ; + client_t *c = CLIENT(q->client) ; + interface_t *y = INTERFACE(q->interface) ; + if (!gensetdyn_delete(&c->queries, q->clientindex)) + strerr_diefu1sys(111, "gensetdyn_delete c->queries in query_remove") ; + if (!gensetdyn_delete(&y->queries, q->interfaceindex)) + strerr_diefu1sys(111, "gensetdyn_delete y->queries in query_remove") ; + query_delete(i) ; +} + +void query_fail (uint32_t i, char status) +{ + query_t *q = QUERY(i) ; + client_t *c = CLIENT(q->client) ; + char pack[10] = "Rssssssssr" ; + unixmessage_t m = { .s = pack, .len = 10, .fds = 0, .nfds = 0 } ; + uint64_pack_big(pack+1, q->serial) ; + pack[9] = status ; + if (!unixmessage_put(&c->async.out, &m)) + strerr_diefu1sys(111, "unixmessage_put in query_fail") ; + query_remove(i) ; + client_setdeadline(c) ; +} + +int query_cancel (uint32_t i, char reason) +{ + query_t *q = QUERY(i) ; + interface_t *y = INTERFACE(q->interface) ; + client_t *c = CLIENT(y->client) ; + char pack[14] = "Ciiiissssssssr" ; + unixmessage_t m = { .s = pack, .len = 14, .fds = 0, .nfds = 0 } ; + uint32_pack_big(pack+1, y->id) ; + uint64_pack_big(pack+5, q->serial) ; + pack[13] = reason ; + if (!unixmessage_put(&c->async.out, &m)) return 0 ; + client_setdeadline(c) ; + return 1 ; +} + +int query_cancelremove (uint32_t i, char reason) +{ + if (!query_cancel(i, reason)) return 0 ; + query_remove(i) ; + return 1 ; +} + +int query_lookup_by_serial (uint64_t serial, uint32_t *d) +{ + return avltree_search(&qserialdict, &serial, d) ; +} + +int query_lookup_by_mindeadline (uint32_t *d) +{ + return avltree_min(&qdeadlinedict, d) ; +} + +void query_get_mindeadline (tain_t *deadline) +{ + uint32_t d ; + if (query_lookup_by_mindeadline(&d)) *deadline = QUERY(d)->deadline ; + else tain_add_g(deadline, &tain_infinite_relative) ; +} + +int query_add (uint32_t *d, tain_t const *deadline, uint32_t client, uint32_t interface) +{ + static uint64_t serial = 1 ; + uint32_t qq, cc, yy ; + query_t *q ; + if (!gensetdyn_new(&queries, &qq)) return 0 ; + if (!gensetdyn_new(&CLIENT(client)->queries, &cc)) goto end0 ; + if (!gensetdyn_new(&INTERFACE(interface)->queries, &yy)) goto end1 ; + q = QUERY(qq) ; + q->serial = serial ; + q->deadline = *deadline ; + q->client = client ; + q->clientindex = cc ; + q->interface = interface ; + q->interfaceindex = yy ; + if (!avltree_insert(&qserialdict, qq)) goto end2 ; + for (;;) + { + static tain_t const nano1 = { .sec = TAI_ZERO, .nano = 1 } ; + uint32_t d ; + if (!avltree_search(&qdeadlinedict, &q->deadline, &d)) break ; + tain_add(&q->deadline, &q->deadline, &nano1) ; + } + if (!avltree_insert(&qdeadlinedict, qq)) goto end3 ; + serial++ ; + *d = qq ; + return 1 ; + + end3: + if (!avltree_delete(&qserialdict, &serial)) + strerr_diefu1sys(111, "avltree_delete in query_add") ; + end2: + if (!gensetdyn_delete(&INTERFACE(interface)->queries, yy)) + strerr_diefu1sys(111, "gensetdyn_delete INTERFACE(interface)->queries in query_add") ; + end1: + if (!gensetdyn_delete(&CLIENT(client)->queries, cc)) + strerr_diefu1sys(111, "gensetdyn_delete CLIENT(client)->queries in query_add") ; + end0: + if (!gensetdyn_delete(&queries, qq)) + strerr_diefu1sys(111, "gensetdyn_delete queries in query_add") ; + return 0 ; +} + +int query_send (uint32_t qq, unixmessage_t const *m) +{ + skabus_rpc_rinfo_t rinfo ; + char pack[4 + SKABUS_RPC_RINFO_PACK] = "Q" ; + struct iovec v[2] = { { .iov_base = pack, .iov_len = 4 + SKABUS_RPC_RINFO_PACK }, { .iov_base = m->s, .iov_len = m->len } } ; + unixmessage_v_t mtosend = { .v = v, .vlen = 2, .fds = m->fds, .nfds = m->nfds } ; + query_t *q = QUERY(qq) ; + interface_t *y = INTERFACE(q->interface) ; + client_t *c = CLIENT(q->client) ; + char const *idstr = client_idstr(c) ; + size_t idstrlen = strlen(idstr) ; + rinfo.serial = q->serial ; + rinfo.limit = q->deadline ; + tain_copynow(&rinfo.timestamp) ; + rinfo.uid = c->uid ; + rinfo.gid = c->gid ; + memcpy(rinfo.idstr, idstr, idstrlen) ; + memset(rinfo.idstr + idstrlen, 0, SKABUS_RPC_IDSTR_SIZE + 1 - idstrlen) ; + uint32_pack_big(pack+1, y->id) ; + skabus_rpc_rinfo_pack(pack + 5, &rinfo) ; + c = CLIENT(y->client) ; + if (!unixmessage_putv_and_close(&c->async.out, &mtosend, unixmessage_bits_closeall)) return 0 ; + client_setdeadline(c) ; + return 1 ; +} + +void query_reply (uint32_t qq, char result, unixmessage_t const *m) +{ + char pack[11] = "R" ; + struct iovec v[2] = { { .iov_base = pack, .iov_len = 11 }, { .iov_base = m->s, .iov_len = m->len } } ; + unixmessage_v_t mtosend = { .v = v, .vlen = 2, .fds = m->fds, .nfds = m->nfds } ; + query_t *q = QUERY(qq) ; + client_t *c = CLIENT(q->client) ; + uint64_pack_big(pack+1, q->serial) ; + pack[9] = 0 ; + pack[10] = result ; + if (!unixmessage_putv_and_close(&c->async.out, &mtosend, unixmessage_bits_closeall)) + strerr_diefu1sys(111, "unixmessage_put in query_reply") ; + client_setdeadline(c) ; + query_remove(qq) ; +} |