summaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
authorLaurent Bercot <ska-skaware@skarnet.org>2017-11-24 18:54:03 +0000
committerLaurent Bercot <ska-skaware@skarnet.org>2017-11-24 18:54:03 +0000
commit6432cead1941d5305bc2d7f22821ca8a98f43f78 (patch)
tree1898075255d0e9ac285a22326356c564c1b6c839 /src/rpc
parent2c5c19923ec56298c4e6b4a71084b27414fd96d6 (diff)
downloadskabus-6432cead1941d5305bc2d7f22821ca8a98f43f78.tar.xz
Add skabus-rpc-daemon, skabus-rpcd and the skabus_rpc library
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/PROTOCOL82
-rw-r--r--src/rpc/deps-exe/skabus-rpc-daemon1
-rw-r--r--src/rpc/deps-exe/skabus-rpcd8
-rw-r--r--src/rpc/skabus-rpc-daemon.c154
-rw-r--r--src/rpc/skabus-rpcd.c542
-rw-r--r--src/rpc/skabus-rpcd.h146
-rw-r--r--src/rpc/skabus_rpcd_client.c188
-rw-r--r--src/rpc/skabus_rpcd_interface.c113
-rw-r--r--src/rpc/skabus_rpcd_query.c207
9 files changed, 1441 insertions, 0 deletions
diff --git a/src/rpc/PROTOCOL b/src/rpc/PROTOCOL
new file mode 100644
index 0000000..4022556
--- /dev/null
+++ b/src/rpc/PROTOCOL
@@ -0,0 +1,82 @@
+
+ Registering (1st connection) (client -> server)
+
+ 'S' : 1
+ pmid : 4
+ idlen : 1
+ relen : 4
+ idstr : idlen
+ '\0' : 1
+ re : relen
+ '\0' : 1
+
+ Sending a query (qclient -> server)
+
+ 'Q' : 1
+ deadline : TAIN_PACK (12)
+ contains sec(8), nano(4)
+ iflen : 1
+ ifname : iflen
+ '\0' : 1
+ msg : msglen
+
+
+ Sending a query (server -> rclient)
+
+ 'Q' : 1
+ ifid : 4
+ rinfo: SKABUS_RPC_RINFO_PACK
+ contains serial(8), deadline(12), timestamp(12), uid(4), gid(4), idstr(SKABUS_RPC_IDSTR_SIZE), '\0'(1)
+ msg : msglen
+
+
+ Sending a reply (rclient -> server)
+
+ 'R' : 1
+ serial : 8
+ result : 1
+ msg : msglen
+
+
+ Sending a reply (server -> qclient)
+
+ 'R' : 1
+ serial : 8
+ status : 1. If not '\0', the rest is ignored.
+ result : 1
+ msg : msglen
+
+
+ Cancelling a query (qclient -> server)
+
+ 'C' : 1
+ serial : 8
+
+
+ Cancelling a query (server -> rclient)
+
+ 'C' : 1
+ ifid : 4
+ serial : 8
+ reason : 1
+
+
+ Registering an interface (client -> server)
+
+ 'I' : 1
+ ifid : 4
+ iflen : 1
+ relen : 4
+ ifname : iflen
+ '\0' : 1
+ re : relen
+ '\0' : 1
+
+
+ Unregistering an interface (client -> server)
+
+ 'i' : 1
+ iflen : 1
+ ifname : iflen
+ '\0' : 1
+
diff --git a/src/rpc/deps-exe/skabus-rpc-daemon b/src/rpc/deps-exe/skabus-rpc-daemon
new file mode 100644
index 0000000..e7187fe
--- /dev/null
+++ b/src/rpc/deps-exe/skabus-rpc-daemon
@@ -0,0 +1 @@
+-lskarnet
diff --git a/src/rpc/deps-exe/skabus-rpcd b/src/rpc/deps-exe/skabus-rpcd
new file mode 100644
index 0000000..a4a573f
--- /dev/null
+++ b/src/rpc/deps-exe/skabus-rpcd
@@ -0,0 +1,8 @@
+skabus_rpcd_client.o
+skabus_rpcd_interface.o
+skabus_rpcd_query.o
+libskabus.a.xyzzy
+-ls6
+-lskarnet
+${TAINNOW_LIB}
+${SOCKET_LIB}
diff --git a/src/rpc/skabus-rpc-daemon.c b/src/rpc/skabus-rpc-daemon.c
new file mode 100644
index 0000000..20c5eea
--- /dev/null
+++ b/src/rpc/skabus-rpc-daemon.c
@@ -0,0 +1,154 @@
+/* ISC license. */
+
+#include <sys/types.h>
+#include <limits.h>
+#include <skalibs/types.h>
+#include <skalibs/sgetopt.h>
+#include <skalibs/strerr2.h>
+#include <skalibs/djbunix.h>
+#include <s6/config.h>
+#include <skabus/config.h>
+
+#define USAGE "skabus-rpc-daemon [ -v verbosity ] [ -d | -D ] [ -1 ] [ -c maxconn ] [ -b backlog ] [ -G gid,gid,... ] [ -g gid ] [ -u uid ] [ -U ] [ -t timeout ] [ -T lameducktimeout ] [ -i rulesdir | -x rulesfile ] [ -S | -s ] [ -J | -j ] path"
+#define dieusage() strerr_dieusage(100, USAGE)
+
+int main (int argc, char const *const *argv, char const *const *envp)
+{
+ unsigned int verbosity = 1 ;
+ int flag1 = 0 ;
+ int flagU = 0 ;
+ int flagreuse = 1 ;
+ uid_t uid = 0 ;
+ gid_t gid = 0 ;
+ gid_t gids[NGROUPS_MAX] ;
+ size_t gidn = (unsigned int)-1 ;
+ unsigned int maxconn = 0 ;
+ unsigned int backlog = (unsigned int)-1 ;
+ int flagpublic = 0 ;
+ int flagifpublic = 0 ;
+ unsigned int timeout = 0 ;
+ unsigned int ltimeout = 0 ;
+ char const *rulesdir = 0 ;
+ char const *rulesfile = 0 ;
+ PROG = "skabus-rpc-daemon" ;
+ {
+ subgetopt_t l = SUBGETOPT_ZERO ;
+ for (;;)
+ {
+ register int opt = subgetopt_r(argc, argv, "Dd1USsJjv:c:b:u:g:G:t:T:i:x:", &l) ;
+ if (opt == -1) break ;
+ switch (opt)
+ {
+ case 'D' : flagreuse = 0 ; break ;
+ case 'd' : flagreuse = 1 ; break ;
+ case '1' : flag1 = 1 ; break ;
+ case 'v' : if (!uint0_scan(l.arg, &verbosity)) dieusage() ; break ;
+ case 'c' : if (!uint0_scan(l.arg, &maxconn)) dieusage() ; if (!maxconn) maxconn = 1 ; break ;
+ case 'b' : if (!uint0_scan(l.arg, &backlog)) dieusage() ; break ;
+ case 'u' : if (!uid0_scan(l.arg, &uid)) dieusage() ; break ;
+ case 'g' : if (!gid0_scan(l.arg, &gid)) dieusage() ; break ;
+ case 'G' : if (!gid_scanlist(gids, NGROUPS_MAX, l.arg, &gidn) && *l.arg) dieusage() ; break ;
+ case 'U' : flagU = 1 ; uid = 0 ; gid = 0 ; gidn = (size_t)-1 ; break ;
+ case 'S' : flagpublic = 0 ; break ;
+ case 's' : flagpublic = 1 ; break ;
+ case 'J' : flagifpublic = 0 ; break ;
+ case 'j' : flagifpublic = 1 ; break ;
+ case 't' : if (!uint0_scan(l.arg, &timeout)) dieusage() ; break ;
+ case 'T' : if (!uint0_scan(l.arg, &ltimeout)) dieusage() ; break ;
+ case 'i' : rulesdir = l.arg ; rulesfile = 0 ; break ;
+ case 'x' : rulesfile = l.arg ; rulesdir = 0 ; break ;
+ default : dieusage() ;
+ }
+ }
+ argc -= l.ind ; argv += l.ind ;
+ if (!argc) dieusage() ;
+ }
+
+ {
+ unsigned int m = 0, pos = 0 ;
+ char const *newargv[30] ;
+ char fmt[UINT_FMT * 6 + UID_FMT + GID_FMT * (1 + NGROUPS_MAX)] ;
+ newargv[m++] = S6_EXTBINPREFIX "s6-ipcserver-socketbinder" ;
+ if (!flagreuse) newargv[m++] = "-D" ;
+ if (backlog != (unsigned int)-1)
+ {
+ newargv[m++] = "-b" ;
+ newargv[m++] = fmt + pos ;
+ pos += uint_fmt(fmt + pos, backlog) ;
+ fmt[pos++] = 0 ;
+ }
+ newargv[m++] = "--" ;
+ newargv[m++] = *argv++ ;
+ if (flagU || uid || gid || gidn != (size_t)-1)
+ {
+ newargv[m++] = S6_EXTBINPREFIX "s6-applyuidgid" ;
+ if (flagU) newargv[m++] = "-Uz" ;
+ if (uid)
+ {
+ newargv[m++] = "-u" ;
+ newargv[m++] = fmt + pos ;
+ pos += uint_fmt(fmt + pos, uid) ;
+ fmt[pos++] = 0 ;
+ }
+ if (gid)
+ {
+ newargv[m++] = "-g" ;
+ newargv[m++] = fmt + pos ;
+ pos += uint_fmt(fmt + pos, gid) ;
+ fmt[pos++] = 0 ;
+ }
+ if (gidn != (size_t)-1)
+ {
+ newargv[m++] = "-G" ;
+ newargv[m++] = fmt + pos ;
+ pos += gid_fmtlist(fmt + pos, gids, gidn) ;
+ fmt[pos++] = 0 ;
+ }
+ newargv[m++] = "--" ;
+ }
+ newargv[m++] = SKABUS_BINPREFIX "skabus-rpcd" ;
+ if (verbosity != 1)
+ {
+ newargv[m++] = "-v" ;
+ newargv[m++] = fmt + pos ;
+ pos += uint_fmt(fmt + pos, verbosity) ;
+ fmt[pos++] = 0 ;
+ }
+ if (flag1) newargv[m++] = "-1" ;
+ if (flagpublic) newargv[m++] = "-s" ;
+ if (flagifpublic) newargv[m++] = "-j" ;
+ if (maxconn)
+ {
+ newargv[m++] = "-c" ;
+ newargv[m++] = fmt + pos ;
+ pos += uint_fmt(fmt + pos, maxconn) ;
+ fmt[pos++] = 0 ;
+ }
+ if (timeout)
+ {
+ newargv[m++] = "-t" ;
+ newargv[m++] = fmt + pos ;
+ pos += uint_fmt(fmt + pos, timeout) ;
+ fmt[pos++] = 0 ;
+ }
+ if (ltimeout)
+ {
+ newargv[m++] = "-T" ;
+ newargv[m++] = fmt + pos ;
+ pos += uint_fmt(fmt + pos, timeout) ;
+ fmt[pos++] = 0 ;
+ }
+ if (rulesdir)
+ {
+ newargv[m++] = "-i" ;
+ newargv[m++] = rulesdir ;
+ }
+ else if (rulesfile)
+ {
+ newargv[m++] = "-x" ;
+ newargv[m++] = rulesfile ;
+ }
+ newargv[m++] = 0 ;
+ xpathexec_run(newargv[0], newargv, envp) ;
+ }
+}
diff --git a/src/rpc/skabus-rpcd.c b/src/rpc/skabus-rpcd.c
new file mode 100644
index 0000000..36c6a78
--- /dev/null
+++ b/src/rpc/skabus-rpcd.c
@@ -0,0 +1,542 @@
+/* ISC license. */
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <stdint.h>
+#include <fcntl.h>
+#include <limits.h>
+#include <regex.h>
+#include <errno.h>
+#include <signal.h>
+#include <skalibs/uint32.h>
+#include <skalibs/uint64.h>
+#include <skalibs/types.h>
+#include <skalibs/bytestr.h>
+#include <skalibs/allreadwrite.h>
+#include <skalibs/error.h>
+#include <skalibs/cdb.h>
+#include <skalibs/strerr2.h>
+#include <skalibs/selfpipe.h>
+#include <skalibs/sig.h>
+#include <skalibs/djbunix.h>
+#include <skalibs/sgetopt.h>
+#include <skalibs/tai.h>
+#include <skalibs/iopause.h>
+#include <skalibs/env.h>
+#include <skalibs/getpeereid.h>
+#include <skalibs/webipc.h>
+#include <skalibs/genset.h>
+#include <skalibs/unixmessage.h>
+#include <s6/accessrules.h>
+#include <skabus/rpc.h>
+#include "skabus-rpcd.h"
+
+#define USAGE "skabus-rpcd [ -v verbosity ] [ -1 ] [ -d | -D ] [ -c maxconn ] [ -t timeout ] [ -T lameducktimeout ] [ -i rulesdir | -x rulesfile ] [ -S | -s ] [ -J | -j ]"
+#define dieusage() strerr_dieusage(100, USAGE) ;
+
+tain_t answertto = TAIN_INFINITE_RELATIVE ;
+
+static unsigned int verbosity = 1 ;
+static int cont = 1 ;
+static tain_t lameduckdeadline = TAIN_INFINITE_RELATIVE ;
+
+static unsigned int rulestype = 0 ;
+static char const *rules = 0 ;
+static int cdbfd = -1 ;
+static struct cdb cdbmap = CDB_ZERO ;
+static int flagidstrpub = 0, flaginterfacespub = 0 ;
+
+static inline void handle_signals (void)
+{
+ for (;;) switch (selfpipe_read())
+ {
+ case -1 : strerr_diefu1sys(111, "selfpipe_read()") ;
+ case 0 : return ;
+ case SIGTERM :
+ {
+ if (cont)
+ {
+ cont = 0 ;
+ tain_add_g(&lameduckdeadline, &lameduckdeadline) ;
+ }
+ break ;
+ }
+ case SIGHUP :
+ {
+ int fd ;
+ struct cdb c = CDB_ZERO ;
+ if (rulestype != 2) break ;
+ fd = open_readb(rules) ;
+ if (fd < 0) break ;
+ if (cdb_init(&c, fd) < 0)
+ {
+ fd_close(fd) ;
+ break ;
+ }
+ cdb_free(&cdbmap) ;
+ fd_close(cdbfd) ;
+ cdbfd = fd ;
+ cdbmap = c ;
+ }
+ break ;
+ default : break ;
+ }
+}
+
+int parse_protocol_async (unixmessage_t const *m, void *p)
+{
+ uint64_t serial ;
+ uint32_t qq ;
+ unixmessage_t mtosend = { .s = m->s + 10, .len = m->len - 10, .fds = m->fds, .nfds = m->nfds } ;
+ if (m->len < 10 || m->s[0] != 'R')
+ {
+ unixmessage_drop(m) ;
+ return (errno = EPROTO, 0) ;
+ }
+ uint64_unpack_big(m->s + 1, &serial) ;
+ if (!query_lookup_by_serial(serial, &qq))
+ {
+ unixmessage_drop(m) ;
+ return 1 ;
+ }
+ query_reply(qq, m->s[9], &mtosend) ;
+ return 1 ;
+}
+
+typedef int hlparsefunc_t (uint32_t, unixmessage_t *) ;
+typedef hlparsefunc_t *hlparsefunc_t_ref ;
+
+static int answer (uint32_t cc, char e)
+{
+ unixmessage_t m = { .s = &e, .len = 1, .fds = 0, .nfds = 0 } ;
+ client_t *c = CLIENT(cc) ;
+ if (!unixmessage_put(&c->sync.out, &m)) return 0 ;
+ if (client_isregistered(cc)) client_setdeadline(c) ;
+ return 1 ;
+}
+
+static int do_idstr (uint32_t cc, unixmessage_t *m)
+{
+ uint32_t relen, pmid, yy ;
+ unsigned char idlen ;
+ if (m->len < 11 || m->nfds) return (errno = EPROTO, 0) ;
+ uint32_unpack_big(m->s, &pmid) ;
+ idlen = m->s[4] ;
+ uint32_unpack_big(m->s + 5, &relen) ;
+ if (m->len != 11 + idlen + relen || m->s[9 + idlen] || m->s[10 + idlen + relen]) return (errno = EPROTO, 0) ;
+ if (client_isregistered(cc)) return answer(cc, EISCONN) ;
+ if (regexec(&CLIENT(cc)->idstr_re, m->s + 9, 0, 0, 0)) return answer(cc, EPERM) ;
+ m->s[8] = 0xff ;
+ if (interface_lookup_by_name(m->s + 8, &yy)) return answer(cc, EADDRINUSE) ;
+ if (!interface_add(&yy, m->s + 8, idlen + 1, cc, m->s + 10 + idlen, pmid))
+ return answer(cc, errno) ;
+ regfree(&CLIENT(cc)->idstr_re) ;
+ return answer(cc, 0) ;
+}
+
+static int do_interface_register (uint32_t cc, unixmessage_t *m)
+{
+ uint32_t id, relen, yy ;
+ unsigned char iflen ;
+ if (m->len < 11 || m->nfds) return (errno = EPROTO, 0) ;
+ uint32_unpack_big(m->s, &id) ;
+ iflen = m->s[4] ;
+ uint32_unpack_big(m->s + 5, &relen) ;
+ if (m->len != 11 + iflen + relen) return (errno = EPROTO, 0) ;
+ if (m->s[9 + iflen] || m->s[10 + iflen + relen]) return (errno = EPROTO, 0) ;
+ if (!client_isregistered(cc)) return answer(cc, ENOTCONN) ;
+ if (regexec(&CLIENT(cc)->interfaces_re, m->s + 9, 0, 0, 0)) return answer(cc, EPERM) ;
+ if (interface_lookup_by_name(m->s + 9, &yy)) return answer(cc, EADDRINUSE) ;
+ if (!interface_add(&yy, m->s + 9, iflen, cc, m->s + 10 + iflen, id)) return answer(cc, errno) ;
+ return answer(cc, 0) ;
+}
+
+static int do_interface_unregister (uint32_t cc, unixmessage_t *m)
+{
+ uint32_t yy ;
+ unsigned char iflen ;
+ if (!m->len || m->nfds) return (errno = EPROTO, 0) ;
+ iflen = m->s[0] ;
+ if ((m->len != iflen + 2) || m->s[iflen+1]) return (errno = EPROTO, 0) ;
+ if (!client_isregistered(cc)) return answer(cc, ENOTCONN) ;
+ if (!iflen || ((unsigned char const *)m->s)[1] == 0xff) return answer(cc, EINVAL) ;
+ if (!interface_lookup_by_name(m->s + 1, &yy)) return answer(cc, errno) ;
+ if (INTERFACE(yy)->client != cc) return answer(cc, EPERM) ;
+ interface_remove(yy) ;
+ return answer(cc, 0) ;
+}
+
+static int do_query_send (uint32_t cc, unixmessage_t *m)
+{
+ tain_t limit ;
+ uint32_t yy, qq ;
+ unsigned char iflen ;
+ int e ;
+ if (m->len < 2 + TAIN_PACK) return (errno = EPROTO, 0) ;
+ tain_unpack(m->s, &limit) ;
+ iflen = m->s[TAIN_PACK] ;
+ if (m->len < 2 + TAIN_PACK + iflen || m->s[TAIN_PACK + 1 + iflen]) return (errno = EPROTO, 0) ;
+ if (!client_isregistered(cc)) { e = ENOTCONN ; goto nope ; }
+ if (!interface_lookup_by_name(m->s + TAIN_PACK + 1, &yy)) { e = ESRCH ; goto nope ; }
+ if (regexec(&INTERFACE(yy)->re, client_idstr(CLIENT(cc)), 0, 0, 0)) { e = EPERM ; goto nope ; }
+ m->s += 2 + TAIN_PACK + iflen ;
+ m->len -= 2 + TAIN_PACK + iflen ;
+ if (!query_add(&qq, &limit, cc, yy) || !query_send(qq, m)) { e = errno ; goto nope ; }
+ return answer(cc, 0) ;
+
+ nope:
+ if (!answer(cc, e)) return 0 ;
+ unixmessage_drop(m) ;
+ return 1 ;
+}
+
+static int do_query_cancel (uint32_t cc, unixmessage_t *m)
+{
+ uint64_t serial ;
+ uint32_t qq ;
+ if (m->len != 8 || m->nfds) return (errno = EPROTO, 0) ;
+ if (!client_isregistered(cc)) return answer(cc, ENOTCONN) ;
+ uint64_unpack_big(m->s, &serial) ;
+ if (!query_lookup_by_serial(serial, &qq)) return answer(cc, errno) ;
+ if (cc != QUERY(qq)->client) return answer(cc, EPERM) ;
+ if (!query_cancelremove(qq, 0)) return answer(cc, errno) ;
+ return answer(cc, 0) ;
+}
+
+static int do_error (uint32_t cc, unixmessage_t *m)
+{
+ (void)cc ;
+ (void)m ;
+ return (errno = EPROTO, 0) ;
+}
+
+int parse_protocol_sync (unixmessage_t const *m, void *p)
+{
+ static hlparsefunc_t_ref const f[6] =
+ {
+ &do_idstr,
+ &do_interface_register,
+ &do_interface_unregister,
+ &do_query_send,
+ &do_query_cancel,
+ &do_error
+ } ;
+ unixmessage_t mcopy = { .s = m->s + 1, .len = m->len -1, .fds = m->fds, .nfds = m->nfds } ;
+ if (!m->len)
+ {
+ unixmessage_drop(m) ;
+ return (errno = EPROTO, 0) ;
+ }
+ if (!(*f[byte_chr("SIiQC", 5, m->s[0])])(*(uint32_t *)p, &mcopy))
+ {
+ unixmessage_drop(m) ;
+ return 0 ;
+ }
+ return 1 ;
+}
+
+static void removeclient (uint32_t *i, uint32_t j)
+{
+ if (verbosity >= 2)
+ {
+ char fmt[UINT32_FMT] ;
+ fmt[uint32_fmt(fmt, *i)] = 0 ;
+ strerr_warni2sys("removing client ", fmt) ;
+ }
+ client_remove(*i, j) ;
+ *i = j ;
+}
+
+static int makere (regex_t *re, char const *s, char const *var)
+{
+ size_t varlen = strlen(var) ;
+ if (str_start(s, var) && (s[varlen] == '='))
+ {
+ int r = regcomp(re, s + varlen + 1, REG_EXTENDED | REG_NOSUB) ;
+ if (r)
+ {
+ if (verbosity)
+ {
+ char buf[256] ;
+ regerror(r, re, buf, 256) ;
+ strerr_warnw6x("invalid ", var, " value: ", s + varlen + 1, ": ", buf) ;
+ }
+ return -1 ;
+ }
+ else return 1 ;
+ }
+ return 0 ;
+}
+
+static void defaultre (regex_t *re, int flag)
+{
+ char const *s = flag ? ".*" : ".^" ;
+ int r = regcomp(re, s, REG_EXTENDED | REG_NOSUB) ;
+ if (r)
+ {
+ char buf[256] ;
+ regerror(r, re, buf, 256) ;
+ strerr_diefu4x(100, "compile ", s, " into a regular expression: ", buf) ;
+ }
+}
+
+static inline int parse_env (char const *const *envp, regex_t *idstr_re, regex_t *interfaces_re, uint32_t *flags)
+{
+ uint32_t fl = 0 ;
+ int idstr_done = 0, interfaces_done = 0 ;
+ for (; *envp ; envp++)
+ {
+ if (str_start(*envp, "SKABUS_RPC_QSENDFDS=")) fl |= 1 ;
+ if (str_start(*envp, "SKABUS_RPC_RSENDFDS=")) fl |= 2 ;
+ if (!idstr_done)
+ {
+ idstr_done = makere(idstr_re, *envp, "SKABUS_RPC_ID_REGEX") ;
+ if (idstr_done < 0)
+ {
+ if (interfaces_done) regfree(interfaces_re) ;
+ return 0 ;
+ }
+ }
+ if (!interfaces_done)
+ {
+ interfaces_done = makere(interfaces_re, *envp, "SKABUS_RPC_INTERFACES_REGEX") ;
+ if (interfaces_done < 0)
+ {
+ if (idstr_done) regfree(idstr_re) ;
+ return 0 ;
+ }
+ }
+ if (idstr_done && interfaces_done) return 1 ;
+ }
+ if (!idstr_done) defaultre(idstr_re, flagidstrpub) ;
+ if (!interfaces_done) defaultre(interfaces_re, flaginterfacespub) ;
+ *flags = fl ;
+ return 1 ;
+}
+
+static inline int new_connection (int fd, uid_t *uid, gid_t *gid, regex_t *idstr_re, regex_t *interfaces_re, uint32_t *flags)
+{
+ s6_accessrules_params_t params = S6_ACCESSRULES_PARAMS_ZERO ;
+ s6_accessrules_result_t result = S6_ACCESSRULES_ERROR ;
+
+ if (getpeereid(fd, uid, gid) < 0)
+ {
+ if (verbosity) strerr_warnwu1sys("getpeereid") ;
+ return 0 ;
+ }
+
+ switch (rulestype)
+ {
+ case 0 :
+ result = S6_ACCESSRULES_ALLOW ; break ;
+ case 1 :
+ result = s6_accessrules_uidgid_fs(*uid, *gid, rules, &params) ; break ;
+ case 2 :
+ result = s6_accessrules_uidgid_cdb(*uid, *gid, &cdbmap, &params) ; break ;
+ default : break ;
+ }
+ if (result != S6_ACCESSRULES_ALLOW)
+ {
+ if (verbosity && (result == S6_ACCESSRULES_ERROR))
+ strerr_warnw1sys("error while checking rules") ;
+ return 0 ;
+ }
+ if (params.exec.s)
+ {
+ stralloc_free(&params.exec) ;
+ if (verbosity)
+ {
+ char fmtuid[UID_FMT] ;
+ char fmtgid[GID_FMT] ;
+ fmtuid[uid_fmt(fmtuid, *uid)] = 0 ;
+ fmtgid[gid_fmt(fmtgid, *gid)] = 0 ;
+ strerr_warnw4x("unused exec string in rules for uid ", fmtuid, " gid ", fmtgid) ;
+ }
+ }
+ if (params.env.s)
+ {
+ size_t n = byte_count(params.env.s, params.env.len, '\0') ;
+ char const *envp[n+1] ;
+ if (!env_make(envp, n, params.env.s, params.env.len))
+ {
+ if (verbosity) strerr_warnwu1sys("env_make") ;
+ stralloc_free(&params.env) ;
+ return 0 ;
+ }
+ envp[n] = 0 ;
+ if (!parse_env(envp, idstr_re, interfaces_re, flags))
+ {
+ if (verbosity) strerr_warnwu1sys("parse_env") ;
+ s6_accessrules_params_free(&params) ;
+ return 0 ;
+ }
+ s6_accessrules_params_free(&params) ;
+ }
+ return 1 ;
+}
+
+int main (int argc, char const *const *argv, char const *const *envp)
+{
+ int spfd ;
+ int flag1 = 0 ;
+ uint32_t maxconn = 64 ;
+ PROG = "skabus-rpcd" ;
+
+ {
+ subgetopt_t l = SUBGETOPT_ZERO ;
+ unsigned int t = 0, T = 0 ;
+ for (;;)
+ {
+ int opt = subgetopt_r(argc, argv, "v:SsJj1i:x:t:T:c:", &l) ;
+ if (opt == -1) break ;
+ switch (opt)
+ {
+ case 'v' : if (!uint0_scan(l.arg, &verbosity)) dieusage() ; break ;
+ case 'S' : flagidstrpub = 0 ; break ;
+ case 's' : flagidstrpub = 1 ; break ;
+ case 'J' : flaginterfacespub = 0 ; break ;
+ case 'j' : flaginterfacespub = 1 ; break ;
+ case '1' : flag1 = 1 ; break ;
+ case 'i' : rules = l.arg ; rulestype = 1 ; break ;
+ case 'x' : rules = l.arg ; rulestype = 2 ; break ;
+ case 't' : if (!uint0_scan(l.arg, &t)) dieusage() ; break ;
+ case 'T' : if (!uint0_scan(l.arg, &T)) dieusage() ; break ;
+ case 'c' : if (!uint320_scan(l.arg, &maxconn)) dieusage() ; break ;
+ default : dieusage() ;
+ }
+ }
+ argc -= l.ind ; argv += l.ind ;
+ if (t) tain_from_millisecs(&answertto, t) ;
+ if (T) tain_from_millisecs(&lameduckdeadline, T) ;
+ }
+ if (maxconn > SKABUS_RPC_MAX) maxconn = SKABUS_RPC_MAX ;
+ if (!maxconn) maxconn = 1 ;
+ {
+ struct stat st ;
+ if (fstat(0, &st) < 0) strerr_diefu1sys(111, "fstat stdin") ;
+ if (!S_ISSOCK(st.st_mode)) strerr_dief1x(100, "stdin is not a socket") ;
+ }
+ if (flag1)
+ {
+ if (fcntl(1, F_GETFD) < 0)
+ strerr_dief1sys(100, "called with option -1 but stdout said") ;
+ }
+ else close(1) ;
+ spfd = selfpipe_init() ;
+ if (spfd < 0) strerr_diefu1sys(111, "selfpipe_init") ;
+ if (sig_ignore(SIGPIPE) < 0) strerr_diefu1sys(111, "ignore SIGPIPE") ;
+ {
+ sigset_t set ;
+ sigemptyset(&set) ;
+ sigaddset(&set, SIGTERM) ;
+ sigaddset(&set, SIGHUP) ;
+ if (selfpipe_trapset(&set) < 0) strerr_diefu1sys(111, "trap signals") ;
+ }
+
+ if (rulestype == 2)
+ {
+ cdbfd = open_readb(rules) ;
+ if (cdbfd < 0) strerr_diefu3sys(111, "open ", rules, " for reading") ;
+ if (cdb_init(&cdbmap, cdbfd) < 0)
+ strerr_diefu2sys(111, "cdb_init ", rules) ;
+ }
+
+ {
+ GENSETB_TYPE(client_t, 1+maxconn) blob ;
+ iopause_fd x[2 + (maxconn << 1)] ;
+ GENSETB_init(client_t, &blob, 1+maxconn) ;
+ sentinel = gensetb_new(&blob) ;
+ blob.storage[sentinel].next = sentinel ;
+ clients = &blob.info ;
+ x[0].fd = spfd ; x[0].events = IOPAUSE_READ ;
+ x[1].fd = 0 ;
+
+ if (flag1)
+ {
+ fd_write(1, "\n", 1) ;
+ fd_close(1) ;
+ }
+ tain_now_g() ;
+
+ for (;;)
+ {
+ tain_t deadline ;
+ int r = 1 ;
+ uint32_t i = blob.storage[sentinel].next, j = 2 ;
+ query_get_mindeadline(&deadline) ;
+ if (!cont && tain_less(&lameduckdeadline, &deadline)) deadline = lameduckdeadline ;
+ if (queries_pending()) r = 0 ;
+
+ x[1].events = (cont && (numconn < maxconn)) ? IOPAUSE_READ : 0 ;
+ for (; i != sentinel ; i = blob.storage[i].next)
+ if (client_prepare_iopause(i, &deadline, x, &j, cont)) r = 0 ;
+ if (!cont && r) break ;
+
+ r = iopause_g(x, j, &deadline) ;
+ if (r < 0) strerr_diefu1sys(111, "iopause") ;
+
+
+ /* Timeout */
+
+ if (!r)
+ {
+ if (!cont && !tain_future(&lameduckdeadline)) return 1 ;
+ for (;;)
+ {
+ if (!query_lookup_by_mindeadline(&i)) break ;
+ if (tain_future(&QUERY(i)->deadline)) break ;
+ query_fail(i, ETIMEDOUT) ;
+ }
+ errno = ETIMEDOUT ;
+ for (i = blob.storage[sentinel].next, j = sentinel ; i != sentinel ; j = i, i = blob.storage[i].next)
+ if (!tain_future(&blob.storage[i].deadline)) removeclient(&i, j) ;
+ continue ;
+ }
+
+
+ /* Signal */
+
+ if (x[0].revents & IOPAUSE_READ) handle_signals() ;
+
+
+ /* Event */
+
+ for (j = sentinel, i = blob.storage[sentinel].next ; i != sentinel ; j = i, i = blob.storage[i].next)
+ if (!client_flush(i, x)) removeclient(&i, j) ;
+
+ for (j = sentinel, i = blob.storage[sentinel].next ; i != sentinel ; j = i, i = blob.storage[i].next)
+ switch(client_read(i, x))
+ {
+ case 0 : errno = 0 ;
+ case -1 :
+ case -2 :
+ {
+ removeclient(&i, j) ;
+ break ;
+ }
+ case 1 : break ;
+ default : X() ;
+ }
+
+
+ /* New connection */
+
+ if (x[1].revents & IOPAUSE_READ)
+ {
+ uint32_t flags = 0 ;
+ uid_t uid ;
+ gid_t gid ;
+ regex_t idstr_re, interfaces_re ;
+ int fd = ipc_accept_nb(x[1].fd, 0, 0, 0) ;
+ if (fd < 0)
+ if (!error_isagain(errno)) strerr_diefu1sys(111, "accept") ;
+ else continue ;
+ else if (!new_connection(fd, &uid, &gid, &idstr_re, &interfaces_re, &flags))
+ fd_close(fd) ;
+ else client_add(&i, &idstr_re, &interfaces_re, uid, gid, fd, flags) ;
+ }
+ }
+ }
+ return 0 ;
+}
diff --git a/src/rpc/skabus-rpcd.h b/src/rpc/skabus-rpcd.h
new file mode 100644
index 0000000..fec72f1
--- /dev/null
+++ b/src/rpc/skabus-rpcd.h
@@ -0,0 +1,146 @@
+/* ISC license. */
+
+#ifndef SKABUS_RPCD_H
+#define SKABUS_RPCD_H
+
+#include <sys/types.h>
+#include <stdint.h>
+#include <regex.h>
+#include <skalibs/uint64.h>
+#include <skalibs/tai.h>
+#include <skalibs/iopause.h>
+#include <skalibs/genalloc.h>
+#include <skalibs/genset.h>
+#include <skalibs/gensetdyn.h>
+#include <skalibs/unixmessage.h>
+#include <skalibs/unixconnection.h>
+#include <skabus/rpc.h>
+
+#define X() strerr_dief1x(101, "unexpected error - please submit a bug-report.") ;
+
+
+ /*
+ query: queries accepted from client, sent to interface
+ The list is stored in a gensetdyn.
+ Looked up by serial for answers or cancels.
+ */
+
+typedef struct query_s query_t, *query_t_ref ;
+struct query_s
+{
+ uint64_t serial ;
+ tain_t deadline ;
+ uint32_t client ;
+ uint32_t clientindex ;
+ uint32_t interface ;
+ uint32_t interfaceindex ;
+} ;
+
+#define QUERY_ZERO \
+{ \
+ .serial = 0, \
+ .deadline = TAIN_ZERO, \
+ .client = 0, \
+ .clientindex = 0, \
+ .interface = 0, \
+ .interfaceindex = 0 \
+}
+
+
+ /*
+ interfaces: registered R interfaces.
+ The list is stored in a gensetdyn.
+ Looked up by name.
+ */
+
+typedef struct interface_s interface_t, *interface_t_ref ;
+struct interface_s
+{
+ char name[SKABUS_RPC_INTERFACE_MAXLEN+1] ;
+ regex_t re ; /* clients who can access that interface */
+ uint32_t id ;
+ uint32_t client ;
+ uint32_t index ; /* in the owner's interfaces list */
+ gensetdyn queries ; /* uint32_t */
+} ;
+#define INTERFACE_ZERO { .name = "", .id = 0, .client = 0, .index = 0, .queries = GENSETDYN_ZERO }
+
+
+ /*
+ client: client connections.
+ The list is stored in a genset.
+ List browsed at every iopause iteration, so needs a next field.
+ */
+
+typedef struct client_s client_t, *client_t_ref ;
+struct client_s
+{
+ uint32_t next ;
+ uid_t uid ;
+ gid_t gid ;
+ tain_t deadline ;
+ genalloc interfaces ; /* uint32_t */
+ gensetdyn queries ; /* uint32_t */
+ unixconnection_t sync ;
+ unixconnection_t async ;
+ uint32_t xindex[2] ;
+ regex_t idstr_re ;
+ regex_t interfaces_re ;
+} ;
+#define CLIENT_ZERO \
+{ \
+ .next = 0, \
+ .uid = (uid_t)-1, \
+ .gid = (gid_t)-1, \
+ .deadline = TAIN_ZERO, \
+ .interfaces = GENALLOC_ZERO, \
+ .queries = GENALLOC_ZERO, \
+ .sync = UNIXCONNECTION_ZERO, \
+ .async = UNIXCONNECTION_ZERO, \
+ .xindex = { 0, 0 }, \
+}
+
+extern gensetdyn queries ;
+#define QUERY(i) GENSETDYN_P(query_t, &queries, (i))
+#define queries_pending() gensetdyn_n(&queries)
+
+extern gensetdyn interfaces ;
+#define INTERFACE(i) GENSETDYN_P(interface_t, &interfaces, (i))
+
+extern genset *clients ;
+extern unsigned int sentinel ;
+#define CLIENT(i) genset_p(client_t, clients, (i))
+#define numconn (genset_n(clients) - 1)
+
+extern void query_remove (uint32_t) ;
+extern void query_fail (uint32_t, char) ;
+extern int query_cancel (uint32_t, char) ;
+extern int query_cancelremove (uint32_t, char) ;
+extern int query_lookup_by_serial (uint64_t, uint32_t *) ;
+extern int query_lookup_by_mindeadline (uint32_t *) ;
+extern void query_get_mindeadline (tain_t *) ;
+extern int query_add (uint32_t *, tain_t const *, uint32_t, uint32_t) ;
+extern int query_send (uint32_t, unixmessage_t const *) ;
+extern int query_sendpm (uint32_t, unixmessage_t const *) ;
+extern void query_reply (uint32_t, char, unixmessage_t const *) ;
+
+extern void interface_remove (uint32_t) ;
+extern int interface_lookup_by_name (char const *, uint32_t *) ;
+extern int interface_add (uint32_t *, char const *, size_t, uint32_t, char const *, uint32_t) ;
+
+#define client_isregistered(cc) genalloc_len(uint32_t, &CLIENT(cc)->interfaces)
+#define client_idstr(c) (INTERFACE(genalloc_s(uint32_t, &(c)->interfaces)[0])->name + 1)
+extern void client_remove (uint32_t, uint32_t) ;
+extern void client_add (uint32_t *, regex_t const *, regex_t const *, uid_t, gid_t, int, uint32_t) ;
+extern void client_nextdeadline (uint32_t, tain_t *) ;
+extern void client_setdeadline (client_t *) ;
+extern int client_prepare_iopause (uint32_t, tain_t *, iopause_fd *, uint32_t *, int) ;
+extern int client_flush (uint32_t, iopause_fd const *) ;
+extern int client_read (uint32_t, iopause_fd const *) ;
+
+extern int parse_protocol_sync (unixmessage_t const *, void *) ;
+extern int parse_protocol_async (unixmessage_t const *, void *) ;
+
+extern tain_t answertto ;
+
+#endif
diff --git a/src/rpc/skabus_rpcd_client.c b/src/rpc/skabus_rpcd_client.c
new file mode 100644
index 0000000..deac459
--- /dev/null
+++ b/src/rpc/skabus_rpcd_client.c
@@ -0,0 +1,188 @@
+/* ISC license. */
+
+#include <sys/types.h>
+#include <stdint.h>
+#include <errno.h>
+#include <regex.h>
+#include <skalibs/djbunix.h>
+#include <skalibs/error.h>
+#include <skalibs/strerr2.h>
+#include <skalibs/tai.h>
+#include <skalibs/iopause.h>
+#include <skalibs/genalloc.h>
+#include <skalibs/genset.h>
+#include <skalibs/gensetdyn.h>
+#include <skalibs/unixmessage.h>
+#include <skalibs/unixconnection.h>
+#include <skalibs/skaclient.h>
+#include <skabus/rpc.h>
+#include "skabus-rpcd.h"
+
+static inline void client_free (client_t *c)
+{
+ if (!genalloc_len(uint32_t, &c->interfaces)) regfree(&c->idstr_re) ;
+ regfree(&c->interfaces_re) ;
+ genalloc_free(unsigned int, &c->interfaces) ;
+ gensetdyn_free(&c->queries) ;
+ fd_close(unixmessage_sender_fd(&c->sync.out)) ;
+ unixconnection_free(&c->sync) ;
+ if (unixmessage_sender_fd(&c->async.out) >= 0)
+ {
+ fd_close(unixmessage_sender_fd(&c->async.out)) ;
+ unixconnection_free(&c->async) ;
+ }
+}
+
+genset *clients ;
+unsigned int sentinel ;
+
+static inline void client_delete (uint32_t i, uint32_t prev)
+{
+ CLIENT(prev)->next = CLIENT(i)->next ;
+ client_free(CLIENT(i)) ;
+ genset_delete(clients, i) ;
+}
+
+static int query_cancelremove_iter (char *s, void *reason)
+{
+ uint32_t i = *(uint32_t *)s ;
+ return query_cancelremove(i, *(char *)reason) ;
+}
+
+void client_remove (uint32_t i, uint32_t prev)
+{
+ client_t *c = CLIENT(i) ;
+ char reason = ECONNABORTED ;
+ if (gensetdyn_iter(&c->queries, &query_cancelremove_iter, &reason) < gensetdyn_n(&c->queries))
+ strerr_diefu1sys(111, "query_cancelremove_iter in client_remove") ;
+ while (genalloc_len(uint32_t, &c->interfaces))
+ interface_remove(genalloc_s(uint32_t, &c->interfaces)[genalloc_len(uint32_t, &c->interfaces) - 1]) ;
+ client_delete(i, prev) ;
+}
+
+void client_setdeadline (client_t *c)
+{
+ tain_t blah ;
+ tain_half(&blah, &tain_infinite_relative) ;
+ tain_add_g(&blah, &blah) ;
+ if (tain_less(&blah, &c->deadline)) tain_add_g(&c->deadline, &answertto) ;
+}
+
+void client_add (uint32_t *d, regex_t const *idstr_re, regex_t const *interfaces_re, uid_t uid, gid_t gid, int fdsock, uint32_t flags)
+{
+ uint32_t cc = genset_new(clients) ;
+ client_t *c = CLIENT(cc) ;
+ c->next = CLIENT(sentinel)->next ;
+ c->uid = uid ;
+ c->gid = gid ;
+ tain_add_g(&c->deadline, &answertto) ;
+ c->interfaces = genalloc_zero ;
+ c->queries = gensetdyn_zero ;
+ c->idstr_re = *idstr_re ;
+ c->interfaces_re = *interfaces_re ;
+ unixconnection_init(&c->sync, fdsock, fdsock) ;
+ unixconnection_init(&c->async, -1, -1) ;
+ c->async.out.fd = -(int)flags-1 ;
+ CLIENT(sentinel)->next = cc ;
+ *d = cc ;
+}
+
+void client_nextdeadline (uint32_t i, tain_t *deadline)
+{
+ client_t *c = CLIENT(i) ;
+ if (tain_less(&c->deadline, deadline)) *deadline = c->deadline ;
+}
+
+int client_prepare_iopause (uint32_t cc, tain_t *deadline, iopause_fd *x, uint32_t *j, int notlameduck)
+{
+ client_t *c = CLIENT(cc) ;
+ int inflight = 0 ;
+ uint32_t i = genalloc_len(uint32_t, &c->interfaces) ;
+ if (tain_less(&c->deadline, deadline)) *deadline = c->deadline ;
+ if (!unixmessage_sender_isempty(&c->sync.out) | !unixmessage_receiver_isempty(&c->sync.in) || (notlameduck && !unixmessage_receiver_isfull(&c->sync.in)))
+ {
+ x[*j].fd = unixmessage_sender_fd(&c->sync.out) ;
+ x[*j].events = ((!unixmessage_receiver_isempty(&c->sync.in) || (notlameduck && !unixmessage_receiver_isfull(&c->sync.in))) ? IOPAUSE_READ : 0)
+ | (!unixmessage_sender_isempty(&c->sync.out) ? IOPAUSE_WRITE : 0) ;
+ c->xindex[0] = (*j)++ ;
+ }
+ else c->xindex[0] = 0 ;
+ while (i--)
+ {
+ interface_t *y = INTERFACE(genalloc_s(uint32_t, &c->interfaces)[i]) ;
+ if (gensetdyn_n(&y->queries))
+ {
+ inflight = 1 ;
+ break ;
+ }
+ }
+ if (!unixmessage_sender_isempty(&c->async.out) || !unixmessage_receiver_isempty(&c->async.in) || inflight)
+ {
+ x[*j].fd = unixmessage_sender_fd(&c->async.out) ;
+ x[*j].events = (unixmessage_sender_isempty(&c->async.out) ? IOPAUSE_WRITE : 0)
+ | (!unixmessage_receiver_isempty(&c->async.in) || inflight ? IOPAUSE_READ : 0) ;
+ c->xindex[1] = (*j)++ ;
+ }
+ else c->xindex[1] = 0 ;
+ return c->xindex[0] || c->xindex[1] ;
+}
+
+int client_flush (uint32_t i, iopause_fd const *x)
+{
+ client_t *c = CLIENT(i) ;
+ int isflushed = 2 ;
+ if (c->xindex[0] && (x[c->xindex[0]].revents & IOPAUSE_WRITE))
+ {
+ if (!unixmessage_sender_flush(&c->sync.out))
+ if (!error_isagain(errno)) return 0 ;
+ else isflushed = 0 ;
+ else isflushed = 1 ;
+ }
+
+ if (c->xindex[1] && (x[c->xindex[1]].revents & IOPAUSE_WRITE))
+ {
+ if (!unixmessage_sender_flush(&c->async.out))
+ if (!error_isagain(errno)) return 0 ;
+ else isflushed = 0 ;
+ else isflushed = !!isflushed ;
+ }
+
+ if (isflushed == 1) tain_add_g(&c->deadline, &tain_infinite_relative) ;
+ return 1 ;
+}
+
+int client_read (uint32_t cc, iopause_fd const *x)
+{
+ client_t *c = CLIENT(cc) ;
+ if (!unixmessage_receiver_isempty(&c->sync.in) || (c->xindex[0] && x[c->xindex[0]].revents & IOPAUSE_READ))
+ {
+ if (unixmessage_sender_fd(&c->async.out) < 0)
+ {
+ unixmessage_t m ;
+ int r = unixmessage_receive(&c->sync.in, &m) ;
+ if (r < 0) return -1 ;
+ if (r)
+ {
+ uint32_t flags = -(unixmessage_sender_fd(&c->async.out) + 1) ;
+ if (!skaclient_server_bidi_ack(&m, &c->sync.out, &c->async.out, &c->async.in, c->async.mainbuf, UNIXMESSAGE_BUFSIZE, c->async.auxbuf, UNIXMESSAGE_AUXBUFSIZE, SKABUS_RPC_BANNER1, SKABUS_RPC_BANNER1_LEN, SKABUS_RPC_BANNER2, SKABUS_RPC_BANNER2_LEN))
+ {
+ unixmessage_drop(&m) ;
+ return -1 ;
+ }
+ if (!(flags & 1)) unixmessage_receiver_refuse_fds(&c->sync.in) ;
+ if (!(flags & 2)) unixmessage_receiver_refuse_fds(&c->async.in) ;
+ }
+ }
+ else
+ {
+ int r = unixmessage_handle(&c->sync.in, &parse_protocol_sync, &cc) ;
+ if (r <= 0) return r ;
+ }
+ }
+ if (!unixmessage_receiver_isempty(&c->async.in) || (c->xindex[1] && x[c->xindex[1]].revents & IOPAUSE_READ))
+ {
+ int r = unixmessage_handle(&c->async.in, &parse_protocol_async, &cc) ;
+ if (r <= 0) return r ;
+ }
+ return 1 ;
+}
diff --git a/src/rpc/skabus_rpcd_interface.c b/src/rpc/skabus_rpcd_interface.c
new file mode 100644
index 0000000..458b2d8
--- /dev/null
+++ b/src/rpc/skabus_rpcd_interface.c
@@ -0,0 +1,113 @@
+/* ISC license. */
+
+#include <string.h>
+#include <stdint.h>
+#include <errno.h>
+#include <regex.h>
+#include <skalibs/strerr2.h>
+#include <skalibs/genalloc.h>
+#include <skalibs/gensetdyn.h>
+#include <skalibs/avltree.h>
+#include <skabus/rpc.h>
+#include "skabus-rpcd.h"
+
+static inline void interface_free (interface_t *p)
+{
+ p->name[0] = p->name[1] = 0 ;
+ gensetdyn_free(&p->queries) ;
+}
+
+static void *if_dtok (uint32_t d, void *x)
+{
+ (void)x ;
+ return INTERFACE(d)->name ;
+}
+
+static int if_cmp (void const *a, void const *b, void *x)
+{
+ (void)x ;
+ return strncmp((char const *)a, (char const *)b, SKABUS_RPC_INTERFACE_MAXLEN) ;
+}
+
+gensetdyn interfaces = GENSETDYN_ZERO ;
+static avltree ifdict = AVLTREE_INIT(2, 3, 8, &if_dtok, &if_cmp, 0) ;
+
+static inline void interface_delete (uint32_t i)
+{
+ interface_t *y = INTERFACE(i) ;
+ if (!avltree_delete(&ifdict, y->name))
+ strerr_diefu1sys(111, "avltree_delete in interface_delete") ;
+ interface_free(y) ;
+ if (!gensetdyn_delete(&interfaces, i))
+ strerr_diefu1sys(111, "gensetdyn_delete in interface_delete") ;
+}
+
+static int query_fail_iter (char *s, void *reason)
+{
+ uint32_t i = *(uint32_t *)s ;
+ query_fail(i, *(char *)reason) ;
+ return 1 ;
+}
+
+static inline void client_interfacemove (client_t *c, uint32_t from, uint32_t to)
+{
+ uint32_t *ifaces = genalloc_s(uint32_t, &c->interfaces) ;
+ INTERFACE(ifaces[from])->index = to ;
+ ifaces[to] = ifaces[from] ;
+}
+
+void interface_remove (uint32_t i)
+ {
+ interface_t *y = INTERFACE(i) ;
+ client_t *c = CLIENT(y->client) ;
+ uint32_t n = gensetdyn_n(&y->queries) ;
+ char reason = ECONNRESET ;
+ gensetdyn_iter(&y->queries, &query_fail_iter, &reason) ;
+ n = genalloc_len(uint32_t, &c->interfaces) ;
+ client_interfacemove(c, n-1, y->index) ;
+ genalloc_setlen(uint32_t, &c->interfaces, n-1) ;
+ interface_delete(i) ;
+}
+
+int interface_lookup_by_name (char const *s, uint32_t *d)
+{
+ return avltree_search(&ifdict, s, d) ;
+}
+
+int interface_add (uint32_t *d, char const *name, size_t namelen, uint32_t client, char const *re, uint32_t id)
+{
+ uint32_t yy ;
+ int e ;
+ genalloc *g = &CLIENT(client)->interfaces ;
+ if (!genalloc_readyplus(uint32_t, g, 1)) return 0 ;
+ if (!gensetdyn_new(&interfaces, &yy)) return 0 ;
+ {
+ interface_t *y = INTERFACE(yy) ;
+ int r = regcomp(&y->re, re, REG_EXTENDED | REG_NOSUB) ;
+ if (r)
+ {
+ e = r == REG_ESPACE ? ENOMEM : EINVAL ;
+ goto err ;
+ }
+ memcpy(y->name, name, namelen) ; y->name[namelen] = 0 ;
+ y->id = id ;
+ y->client = client ;
+ y->index = genalloc_len(uint32_t, g) - 1 ;
+ y->queries = gensetdyn_zero ;
+ if (!avltree_insert(&ifdict, yy))
+ {
+ e = errno ;
+ regfree(&y->re) ;
+ goto err ;
+ }
+ }
+ genalloc_append(uint32_t, g, &yy) ;
+ *d = yy ;
+ return 1 ;
+
+ err:
+ if (!gensetdyn_delete(&interfaces, yy))
+ strerr_diefu1sys(111, "gensetdyn_delete in interface_add") ;
+ errno = e ;
+ return 0 ;
+}
diff --git a/src/rpc/skabus_rpcd_query.c b/src/rpc/skabus_rpcd_query.c
new file mode 100644
index 0000000..baa6df5
--- /dev/null
+++ b/src/rpc/skabus_rpcd_query.c
@@ -0,0 +1,207 @@
+ /* ISC license. */
+
+#include <sys/uio.h>
+#include <string.h>
+#include <stdint.h>
+#include <skalibs/uint32.h>
+#include <skalibs/uint64.h>
+#include <skalibs/tai.h>
+#include <skalibs/strerr2.h>
+#include <skalibs/genalloc.h>
+#include <skalibs/gensetdyn.h>
+#include <skalibs/avltree.h>
+#include <skalibs/unixmessage.h>
+#include <skabus/rpc.h>
+#include "skabus-rpcd.h"
+
+static void *query_serial_dtok (uint32_t d, void *x)
+{
+ (void)x ;
+ return &QUERY(d)->serial ;
+}
+
+static void *query_deadline_dtok (uint32_t d, void *x)
+{
+ (void)x ;
+ return &QUERY(d)->deadline ;
+}
+
+static int query_serial_cmp (void const *a, void const *b, void *x)
+{
+ uint64_t aa = *(uint64_t *)a ;
+ uint64_t bb = *(uint64_t *)b ;
+ (void)x ;
+ return aa < bb ? -1 : aa > bb ;
+}
+
+static int query_deadline_cmp (void const *a, void const *b, void *x)
+{
+ tain_t const *aa = (tain_t const *)a ;
+ tain_t const *bb = (tain_t const *)b ;
+ (void)x ;
+ return tain_less(aa, bb) ? -1 : tain_less(bb, aa) ;
+}
+
+gensetdyn queries = GENSETDYN_ZERO ;
+static avltree qserialdict = AVLTREE_INIT(10, 1, 2, &query_serial_dtok, &query_serial_cmp, 0) ;
+static avltree qdeadlinedict = AVLTREE_INIT(10, 1, 2, &query_deadline_dtok, &query_deadline_cmp, 0) ;
+
+static inline void query_delete (uint32_t i)
+{
+ if (!avltree_delete(&qdeadlinedict, &QUERY(i)->deadline))
+ strerr_diefu1sys(111, "avltree_delete qdeadlinedict in query_delete") ;
+ if (!avltree_delete(&qserialdict, &QUERY(i)->serial))
+ strerr_diefu1sys(111, "avltree_delete qserialdict in query_delete") ;
+ if (!gensetdyn_delete(&queries, i))
+ strerr_diefu1sys(111, "gensetdyn_delete in query_delete") ;
+}
+
+void query_remove (uint32_t i)
+{
+ query_t *q = QUERY(i) ;
+ client_t *c = CLIENT(q->client) ;
+ interface_t *y = INTERFACE(q->interface) ;
+ if (!gensetdyn_delete(&c->queries, q->clientindex))
+ strerr_diefu1sys(111, "gensetdyn_delete c->queries in query_remove") ;
+ if (!gensetdyn_delete(&y->queries, q->interfaceindex))
+ strerr_diefu1sys(111, "gensetdyn_delete y->queries in query_remove") ;
+ query_delete(i) ;
+}
+
+void query_fail (uint32_t i, char status)
+{
+ query_t *q = QUERY(i) ;
+ client_t *c = CLIENT(q->client) ;
+ char pack[10] = "Rssssssssr" ;
+ unixmessage_t m = { .s = pack, .len = 10, .fds = 0, .nfds = 0 } ;
+ uint64_pack_big(pack+1, q->serial) ;
+ pack[9] = status ;
+ if (!unixmessage_put(&c->async.out, &m))
+ strerr_diefu1sys(111, "unixmessage_put in query_fail") ;
+ query_remove(i) ;
+ client_setdeadline(c) ;
+}
+
+int query_cancel (uint32_t i, char reason)
+{
+ query_t *q = QUERY(i) ;
+ interface_t *y = INTERFACE(q->interface) ;
+ client_t *c = CLIENT(y->client) ;
+ char pack[14] = "Ciiiissssssssr" ;
+ unixmessage_t m = { .s = pack, .len = 14, .fds = 0, .nfds = 0 } ;
+ uint32_pack_big(pack+1, y->id) ;
+ uint64_pack_big(pack+5, q->serial) ;
+ pack[13] = reason ;
+ if (!unixmessage_put(&c->async.out, &m)) return 0 ;
+ client_setdeadline(c) ;
+ return 1 ;
+}
+
+int query_cancelremove (uint32_t i, char reason)
+{
+ if (!query_cancel(i, reason)) return 0 ;
+ query_remove(i) ;
+ return 1 ;
+}
+
+int query_lookup_by_serial (uint64_t serial, uint32_t *d)
+{
+ return avltree_search(&qserialdict, &serial, d) ;
+}
+
+int query_lookup_by_mindeadline (uint32_t *d)
+{
+ return avltree_min(&qdeadlinedict, d) ;
+}
+
+void query_get_mindeadline (tain_t *deadline)
+{
+ uint32_t d ;
+ if (query_lookup_by_mindeadline(&d)) *deadline = QUERY(d)->deadline ;
+ else tain_add_g(deadline, &tain_infinite_relative) ;
+}
+
+int query_add (uint32_t *d, tain_t const *deadline, uint32_t client, uint32_t interface)
+{
+ static uint64_t serial = 1 ;
+ uint32_t qq, cc, yy ;
+ query_t *q ;
+ if (!gensetdyn_new(&queries, &qq)) return 0 ;
+ if (!gensetdyn_new(&CLIENT(client)->queries, &cc)) goto end0 ;
+ if (!gensetdyn_new(&INTERFACE(interface)->queries, &yy)) goto end1 ;
+ q = QUERY(qq) ;
+ q->serial = serial ;
+ q->deadline = *deadline ;
+ q->client = client ;
+ q->clientindex = cc ;
+ q->interface = interface ;
+ q->interfaceindex = yy ;
+ if (!avltree_insert(&qserialdict, qq)) goto end2 ;
+ for (;;)
+ {
+ static tain_t const nano1 = { .sec = TAI_ZERO, .nano = 1 } ;
+ uint32_t d ;
+ if (!avltree_search(&qdeadlinedict, &q->deadline, &d)) break ;
+ tain_add(&q->deadline, &q->deadline, &nano1) ;
+ }
+ if (!avltree_insert(&qdeadlinedict, qq)) goto end3 ;
+ serial++ ;
+ *d = qq ;
+ return 1 ;
+
+ end3:
+ if (!avltree_delete(&qserialdict, &serial))
+ strerr_diefu1sys(111, "avltree_delete in query_add") ;
+ end2:
+ if (!gensetdyn_delete(&INTERFACE(interface)->queries, yy))
+ strerr_diefu1sys(111, "gensetdyn_delete INTERFACE(interface)->queries in query_add") ;
+ end1:
+ if (!gensetdyn_delete(&CLIENT(client)->queries, cc))
+ strerr_diefu1sys(111, "gensetdyn_delete CLIENT(client)->queries in query_add") ;
+ end0:
+ if (!gensetdyn_delete(&queries, qq))
+ strerr_diefu1sys(111, "gensetdyn_delete queries in query_add") ;
+ return 0 ;
+}
+
+int query_send (uint32_t qq, unixmessage_t const *m)
+{
+ skabus_rpc_rinfo_t rinfo ;
+ char pack[4 + SKABUS_RPC_RINFO_PACK] = "Q" ;
+ struct iovec v[2] = { { .iov_base = pack, .iov_len = 4 + SKABUS_RPC_RINFO_PACK }, { .iov_base = m->s, .iov_len = m->len } } ;
+ unixmessage_v_t mtosend = { .v = v, .vlen = 2, .fds = m->fds, .nfds = m->nfds } ;
+ query_t *q = QUERY(qq) ;
+ interface_t *y = INTERFACE(q->interface) ;
+ client_t *c = CLIENT(q->client) ;
+ char const *idstr = client_idstr(c) ;
+ size_t idstrlen = strlen(idstr) ;
+ rinfo.serial = q->serial ;
+ rinfo.limit = q->deadline ;
+ tain_copynow(&rinfo.timestamp) ;
+ rinfo.uid = c->uid ;
+ rinfo.gid = c->gid ;
+ memcpy(rinfo.idstr, idstr, idstrlen) ;
+ memset(rinfo.idstr + idstrlen, 0, SKABUS_RPC_IDSTR_SIZE + 1 - idstrlen) ;
+ uint32_pack_big(pack+1, y->id) ;
+ skabus_rpc_rinfo_pack(pack + 5, &rinfo) ;
+ c = CLIENT(y->client) ;
+ if (!unixmessage_putv_and_close(&c->async.out, &mtosend, unixmessage_bits_closeall)) return 0 ;
+ client_setdeadline(c) ;
+ return 1 ;
+}
+
+void query_reply (uint32_t qq, char result, unixmessage_t const *m)
+{
+ char pack[11] = "R" ;
+ struct iovec v[2] = { { .iov_base = pack, .iov_len = 11 }, { .iov_base = m->s, .iov_len = m->len } } ;
+ unixmessage_v_t mtosend = { .v = v, .vlen = 2, .fds = m->fds, .nfds = m->nfds } ;
+ query_t *q = QUERY(qq) ;
+ client_t *c = CLIENT(q->client) ;
+ uint64_pack_big(pack+1, q->serial) ;
+ pack[9] = 0 ;
+ pack[10] = result ;
+ if (!unixmessage_putv_and_close(&c->async.out, &mtosend, unixmessage_bits_closeall))
+ strerr_diefu1sys(111, "unixmessage_put in query_reply") ;
+ client_setdeadline(c) ;
+ query_remove(qq) ;
+}