summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLaurent Bercot <ska-skaware@skarnet.org>2020-01-10 10:35:55 +0000
committerLaurent Bercot <ska-skaware@skarnet.org>2020-01-10 10:35:55 +0000
commit9dafc75bb51c0f350330277a2e96b04f5a1c73b3 (patch)
treed00d225400b981648d362e4c3bb56b46c0608b7c /src
parent4a14a6b86a4a3c5ab8bc36e4c417a6783dfab463 (diff)
downloadskabus-9dafc75bb51c0f350330277a2e96b04f5a1c73b3.tar.xz
Add pub library and daemon
Diffstat (limited to 'src')
-rw-r--r--src/include/skabus/pub.h168
-rw-r--r--src/include/skabus/skabus.h1
-rw-r--r--src/libskabus/deps-lib/skabus23
-rw-r--r--src/libskabus/skabus-pub-internal.h12
-rw-r--r--src/libskabus/skabus_pub_end.c25
-rw-r--r--src/libskabus/skabus_pub_list.c17
-rw-r--r--src/libskabus/skabus_pub_list_async.c45
-rw-r--r--src/libskabus/skabus_pub_message_get.c32
-rw-r--r--src/libskabus/skabus_pub_message_getnfds.c11
-rw-r--r--src/libskabus/skabus_pub_register.c15
-rw-r--r--src/libskabus/skabus_pub_register_async.c30
-rw-r--r--src/libskabus/skabus_pub_send.c15
-rw-r--r--src/libskabus/skabus_pub_send_async.c16
-rw-r--r--src/libskabus/skabus_pub_send_cb.c25
-rw-r--r--src/libskabus/skabus_pub_sendpm.c15
-rw-r--r--src/libskabus/skabus_pub_sendpm_async.c27
-rw-r--r--src/libskabus/skabus_pub_sendv.c16
-rw-r--r--src/libskabus/skabus_pub_sendv_async.c18
-rw-r--r--src/libskabus/skabus_pub_sendvpm.c15
-rw-r--r--src/libskabus/skabus_pub_sendvpm_async.c25
-rw-r--r--src/libskabus/skabus_pub_start.c20
-rw-r--r--src/libskabus/skabus_pub_start_async.c19
-rw-r--r--src/libskabus/skabus_pub_subunsub.c15
-rw-r--r--src/libskabus/skabus_pub_subunsub_async.c22
-rw-r--r--src/libskabus/skabus_pub_update.c58
-rwxr-xr-xsrc/pub/deps-exe/skabus-pub-daemon1
-rwxr-xr-xsrc/pub/deps-exe/skabus-pubd4
-rw-r--r--src/pub/skabus-pub-daemon.c162
-rw-r--r--src/pub/skabus-pubd.c964
29 files changed, 1815 insertions, 1 deletions
diff --git a/src/include/skabus/pub.h b/src/include/skabus/pub.h
new file mode 100644
index 0000000..68168d0
--- /dev/null
+++ b/src/include/skabus/pub.h
@@ -0,0 +1,168 @@
+/* ISC license. */
+
+#ifndef SKABUS_PUB_H
+#define SKABUS_PUB_H
+
+#include <stdint.h>
+#include <sys/uio.h>
+
+#include <skalibs/uint64.h>
+#include <skalibs/diuint32.h>
+#include <skalibs/tai.h>
+#include <skalibs/stralloc.h>
+#include <skalibs/genalloc.h>
+#include <skalibs/unixmessage.h>
+#include <skalibs/skaclient.h>
+
+
+/* Misc constants */
+
+#define SKABUS_PUB_MAX 4000
+#define SKABUS_PUB_BANNER1 "skabus_pub v1.0 (b)\n"
+#define SKABUS_PUB_BANNER1_LEN (sizeof SKABUS_PUB_BANNER1 - 1)
+#define SKABUS_PUB_BANNER2 "skabus_pub v1.0 (a)\n"
+#define SKABUS_PUB_BANNER2_LEN (sizeof SKABUS_PUB_BANNER2 - 1)
+#define SKABUS_PUB_IDSTR_SIZE 254
+
+
+ /* skabus_pub auxiliary data */
+
+typedef struct skabus_pub_msginfo_s skabus_pub_msginfo_t, *skabus_pub_msginfo_t_ref ;
+struct skabus_pub_msginfo_s
+{
+ uint64_t serial ;
+ tain_t timestamp ;
+ uint8_t flags ;
+ char sender[SKABUS_PUB_IDSTR_SIZE + 1] ;
+} ;
+#define SKABUS_PUB_MSGINFO_ZERO { .serial = 0, .timestamp = TAIN_ZERO, .flags = 0, .sender = "" }
+
+
+ /* internal client message storage */
+
+typedef struct skabus_pub_cltinfo_s skabus_pub_cltinfo_t, *skabus_pub_cltinfo_t_ref ;
+struct skabus_pub_cltinfo_s
+{
+ skabus_pub_msginfo_t msginfo ;
+ int fd ;
+ size_t nfds ;
+ int *fds ;
+} ;
+#define SKABUS_PUB_CLTINFO_ZERO { .msginfo = SKABUS_PUB_MSGINFO_ZERO, .fd = -1, .nfds = 0, .fds = 0 }
+
+
+ /* skabus_pub client connection */
+
+typedef struct skabus_pub_s skabus_pub_t, *skabus_pub_t_ref ;
+struct skabus_pub_s
+{
+ skaclient_t connection ;
+ genalloc info ; /* array of skabus_pub_cltinfo_t */
+ size_t head ;
+ skaclient_buffer_t buffers ;
+} ;
+#define SKABUS_PUB_ZERO { .connection = SKACLIENT_ZERO, .info = GENALLOC_ZERO, .head = 0 }
+
+
+ /* Starting and ending a session */
+
+typedef struct skabus_pub_start_result_s skabus_pub_start_result_t, *skabus_pub_start_result_t_ref ;
+struct skabus_pub_start_result_s
+{
+ skaclient_cbdata_t skaclient_cbdata ;
+} ;
+
+#define skabus_pub_init(a, path, id, sre, wre, deadline, stamp) (skabus_pub_start(a, path, deadline, stamp) && skabus_pub_register(a, id, sre, wre, deadline, stamp))
+#define skabus_pub_init_g(a, path, id, sre, wre, deadline) skabus_pub_init(a, path, id, sre, wre, (deadline), &STAMP)
+
+extern int skabus_pub_start_async (skabus_pub_t *, char const *, skabus_pub_start_result_t *) ;
+extern int skabus_pub_start (skabus_pub_t *, char const *, tain_t const *, tain_t *) ;
+#define skabus_pub_start_g(a, path, deadline) skabus_pub_start(a, path, (deadline), &STAMP)
+
+extern int skabus_pub_register_async (skabus_pub_t *, char const *, char const *, char const *, unsigned char *) ;
+extern int skabus_pub_register (skabus_pub_t *, char const *, char const *, char const *, tain_t const *, tain_t *) ;
+#define skabus_pub_register_g(a, id, sre, wre, deadline) skabus_pub_register(a, id, sre, wre, (deadline), &STAMP)
+
+extern void skabus_pub_end (skabus_pub_t *) ;
+
+
+ /* Reading messages */
+
+#define skabus_pub_fd(a) skaclient_fd(&(a)->connection)
+extern int skabus_pub_update (skabus_pub_t *) ;
+extern int skabus_pub_message_getnfds (skabus_pub_t const *) ;
+extern size_t skabus_pub_message_get (skabus_pub_t *, skabus_pub_msginfo_t *, int *, int *) ;
+
+
+ /* Sending public messages */
+
+typedef struct skabus_pub_send_result_s skabus_pub_send_result_t, *skabus_pub_send_result_t_ref ;
+struct skabus_pub_send_result_s
+{
+ uint64_t u ;
+ unsigned char err ;
+} ;
+
+extern int skabus_pub_send_withfds_async (skabus_pub_t *, char const *, size_t, int const *, unsigned int, unsigned char const *, skabus_pub_send_result_t *) ;
+#define skabus_pub_send_async(a, s, len, res) skabus_pub_send_withfds_async(a, s, len, 0, 0, unixmessage_bits_closenone, res)
+
+extern uint64_t skabus_pub_send_withfds (skabus_pub_t *, char const *, size_t, int const *, unsigned int, unsigned char const *, tain_t const *, tain_t *) ;
+#define skabus_pub_send_withfds_g(a, s, len, fds, nfds, bits, deadline) skabus_pub_send_withfds(a, s, len, fds, nfds, bits, (deadline), &STAMP)
+#define skabus_pub_send(a, s, len, deadline, stamp) skabus_pub_send_withfds(a, s, len, 0, 0, unixmessage_bits_closenone, deadline, stamp)
+#define skabus_pub_send_g(a, s, len, deadline) skabus_pub_send(a, s, len, (deadline), &STAMP)
+
+extern int skabus_pub_sendv_withfds_async (skabus_pub_t *, struct iovec const *, unsigned int, int const *, unsigned int, unsigned char const *, skabus_pub_send_result_t *) ;
+#define skabus_pub_sendv_async(a, v, vlen, res) skabus_pub_sendv_withfds_async(a, v, vlen, 0, 0, unixmessage_bits_closenone, res)
+
+extern uint64_t skabus_pub_sendv_withfds (skabus_pub_t *, struct iovec const *, unsigned int, int const *, unsigned int, unsigned char const *, tain_t const *, tain_t *) ;
+#define skabus_pub_sendv_withfds_g(a, v, vlen, fds, nfds, bits, deadline) skabus_pub_sendv_withfds(a, v, vlen, fds, nfds, bits, (deadline), &STAMP)
+#define skabus_pub_sendv(a, v, vlen, deadline, stamp) skabus_pub_sendv_withfds(a, v, vlen, 0, 0, unixmessage_bits_closenone, deadline, stamp)
+#define skabus_pub_sendv_g(a, v, vlen, deadline) skabus_pub_sendv(a, v, vlen, (deadline), &STAMP)
+
+
+ /* Sending private messages */
+
+extern int skabus_pub_sendpm_withfds_async (skabus_pub_t *, char const *, char const *, size_t, int const *, unsigned int, unsigned char const *, skabus_pub_send_result_t *) ;
+#define skabus_pub_sendpm_async(a, id, s, len, res) skabus_pub_sendpm_withfds_async(a, id, s, len, 0, 0, unixmessage_bits_closenone, res)
+
+extern uint64_t skabus_pub_sendpm_withfds (skabus_pub_t *, char const *, char const *, size_t, int const *, unsigned int, unsigned char const *, tain_t const *, tain_t *) ;
+#define skabus_pub_sendpm_withfds_g(a, id, s, len, fds, nfds, bits, deadline) skabus_pub_sendpm_withfds(a, id, s, len, fds, nfds, bits, (deadline), &STAMP)
+#define skabus_pub_sendpm(a, id, s, len, deadline, stamp) skabus_pub_sendpm_withfds(a, id, s, len, 0, 0, unixmessage_bits_closenone, deadline, stamp)
+#define skabus_pub_sendpm_g(a, id, s, len, deadline) skabus_pub_sendpm(a, id, s, len, (deadline), &STAMP)
+
+extern int skabus_pub_sendvpm_withfds_async (skabus_pub_t *, char const *, struct iovec const *, unsigned int, int const *, unsigned int, unsigned char const *, skabus_pub_send_result_t *) ;
+#define skabus_pub_sendvpm_async(a, id, v, vlen, res) skabus_pub_sendvpm_withfds_async(a, id, v, vlen, 0, 0, unixmessage_bits_closenone, res)
+
+extern uint64_t skabus_pub_sendvpm_withfds (skabus_pub_t *, char const *, struct iovec const *, unsigned int, int const *, unsigned int, unsigned char const *, tain_t const *, tain_t *) ;
+#define skabus_pub_sendvpm_withfds_g(a, id, v, vlen, fds, nfds, bits, deadline) skabus_pub_sendvpm_withfds(a, id, v, vlen, fds, nfds, bits, (deadline), &STAMP)
+#define skabus_pub_sendvpm(a, id, v, vlen, deadline, stamp) skabus_pub_sendvpm_withfds(a, id, v, vlen, 0, 0, unixmessage_bits_closenone, deadline, stamp)
+#define skabus_pub_sendvpm_g(a, id, v, vlen, deadline) skabus_pub_sendvpm(a, id, v, vlen, (deadline), &STAMP)
+
+
+ /* Subscribing to a sender */
+
+extern int skabus_pub_subunsub_async (skabus_pub_t *, char, char const *, unsigned char *) ;
+#define skabus_pub_subscribe_async(a, id, err) skabus_pub_subunsub_async(a, 'S', id, err)
+#define skabus_pub_unsubscribe_async(a, id, err) skabus_pub_subunsub_async(a, 'U', id, err)
+extern int skabus_pub_subunsub (skabus_pub_t *, char, char const *, tain_t const *, tain_t *) ;
+#define skabus_pub_subscribe(a, id, deadline, stamp) skabus_pub_subunsub(a, 'S', id, deadline, stamp)
+#define skabus_pub_subscribe_g(a, id, deadline) skabus_pub_subscribe(a, id, (deadline), &STAMP)
+#define skabus_pub_unsubscribe(a, id, deadline, stamp) skabus_pub_subunsub(a, 'U', id, deadline, stamp)
+#define skabus_pub_unsubscribe_g(a, id, deadline) skabus_pub_unsubscribe(a, id, (deadline), &STAMP)
+
+
+ /* Listing all clients */
+
+typedef struct skabus_pub_list_result_s skabus_pub_list_result_t, *skabus_pub_list_result_t_ref ;
+struct skabus_pub_list_result_s
+{
+ stralloc *sa ;
+ diuint32 n ;
+ unsigned char err ;
+} ;
+
+extern int skabus_pub_list_async (skabus_pub_t *, stralloc *, skabus_pub_list_result_t *) ;
+extern int skabus_pub_list (skabus_pub_t *, stralloc *, diuint32 *, tain_t const *, tain_t *) ;
+#define skabus_pub_list_g(a, sa, deadline) skabus_pub_list(a, sa, (deadline), &STAMP)
+
+#endif
diff --git a/src/include/skabus/skabus.h b/src/include/skabus/skabus.h
index 1c07537..d0e8891 100644
--- a/src/include/skabus/skabus.h
+++ b/src/include/skabus/skabus.h
@@ -7,6 +7,7 @@
extern "C" {
#endif
+#include <skabus/pub.h>
#include <skabus/rpc.h>
#ifdef __cplusplus
diff --git a/src/libskabus/deps-lib/skabus b/src/libskabus/deps-lib/skabus
index 21582c7..a7a8369 100644
--- a/src/libskabus/deps-lib/skabus
+++ b/src/libskabus/deps-lib/skabus
@@ -1,3 +1,24 @@
+skabus_pub_end.o
+skabus_pub_list.o
+skabus_pub_list_async.o
+skabus_pub_message_get.o
+skabus_pub_message_getnfds.o
+skabus_pub_register.o
+skabus_pub_register_async.o
+skabus_pub_send.o
+skabus_pub_send_async.o
+skabus_pub_send_cb.o
+skabus_pub_sendpm.o
+skabus_pub_sendpm_async.o
+skabus_pub_sendv.o
+skabus_pub_sendv_async.o
+skabus_pub_sendvpm.o
+skabus_pub_sendvpm_async.o
+skabus_pub_start.o
+skabus_pub_start_async.o
+skabus_pub_subunsub.o
+skabus_pub_subunsub_async.o
+skabus_pub_update.o
skabus_rpc_cancel.o
skabus_rpc_cancel_async.o
skabus_rpc_end.o
@@ -17,8 +38,8 @@ skabus_rpc_r_notimpl.o
skabus_rpc_rcancel_ignore.o
skabus_rpc_release.o
skabus_rpc_reply.o
-skabus_rpc_replyv.o
skabus_rpc_reply_async.o
+skabus_rpc_replyv.o
skabus_rpc_replyv_async.o
skabus_rpc_rinfo_pack.o
skabus_rpc_rinfo_unpack.o
diff --git a/src/libskabus/skabus-pub-internal.h b/src/libskabus/skabus-pub-internal.h
new file mode 100644
index 0000000..efe8bb9
--- /dev/null
+++ b/src/libskabus/skabus-pub-internal.h
@@ -0,0 +1,12 @@
+/* ISC license. */
+
+#ifndef SKABUS_PUB_INTERNAL_H
+#define SKABUS_PUB_INTERNAL_H
+
+#include <skalibs/unixmessage.h>
+
+#define SKABUS_HEAD_MAX 64
+
+extern unixmessage_handler_func_t skabus_pub_send_cb ;
+
+#endif
diff --git a/src/libskabus/skabus_pub_end.c b/src/libskabus/skabus_pub_end.c
new file mode 100644
index 0000000..579e83e
--- /dev/null
+++ b/src/libskabus/skabus_pub_end.c
@@ -0,0 +1,25 @@
+/* ISC license. */
+
+#include <skalibs/alloc.h>
+#include <skalibs/djbunix.h>
+#include <skalibs/genalloc.h>
+#include <skalibs/skaclient.h>
+
+#include <skabus/pub.h>
+
+static void skabus_pub_cltinfo_free (skabus_pub_cltinfo_t *p)
+{
+ fd_close(p->fd) ;
+ if (p->nfds)
+ {
+ for (size_t i = 0 ; i < p->nfds ; i++) fd_close(p->fds[i]) ;
+ alloc_free(p->fds) ;
+ }
+}
+
+void skabus_pub_end (skabus_pub_t *a)
+{
+ skaclient_end(&a->connection) ;
+ genalloc_deepfree(skabus_pub_cltinfo_t, &a->info, &skabus_pub_cltinfo_free) ;
+ a->head = 0 ;
+}
diff --git a/src/libskabus/skabus_pub_list.c b/src/libskabus/skabus_pub_list.c
new file mode 100644
index 0000000..4b8ceeb
--- /dev/null
+++ b/src/libskabus/skabus_pub_list.c
@@ -0,0 +1,17 @@
+/* ISC license. */
+
+#include <errno.h>
+
+#include <skalibs/skaclient.h>
+
+#include <skabus/pub.h>
+
+int skabus_pub_list (skabus_pub_t *a, stralloc *sa, diuint32 *n, tain_t const *deadline, tain_t *stamp)
+{
+ skabus_pub_list_result_t r ;
+ if (!skabus_pub_list_async(a, sa, &r)) return 0 ;
+ if (!skaclient_syncify(&a->connection, deadline, stamp)) return 0 ;
+ if (r.err) return (errno = r.err, 0) ;
+ *n = r.n ;
+ return 1 ;
+}
diff --git a/src/libskabus/skabus_pub_list_async.c b/src/libskabus/skabus_pub_list_async.c
new file mode 100644
index 0000000..48797b5
--- /dev/null
+++ b/src/libskabus/skabus_pub_list_async.c
@@ -0,0 +1,45 @@
+/* ISC license. */
+
+#include <stdint.h>
+#include <errno.h>
+
+#include <skalibs/posixishard.h>
+#include <skalibs/uint32.h>
+#include <skalibs/stralloc.h>
+#include <skalibs/unixmessage.h>
+#include <skalibs/skaclient.h>
+
+#include <skabus/pub.h>
+
+static int skabus_pub_list_cb (unixmessage_t const *m, void *p)
+{
+ skabus_pub_list_result_t *r = p ;
+ uint32_t n ;
+ size_t w = 9, len = m->len - 9 ;
+ if (m->len < 9 || m->nfds) goto errproto ;
+ r->err = m->s[0] ;
+ if (r->err) return 1 ;
+ uint32_unpack_big(m->s + 1, &n) ;
+ if (n > 0x7fffffffu || m->len < 9 + (n<<1)) goto errproto ;
+ if (!stralloc_readyplus(r->sa, m->len - 9 - n)) goto errproto ;
+ r->n.left = n ;
+ uint32_unpack_big(m->s + 5, &n) ; r->n.right = n ;
+ for (uint32_t i = 0 ; i < r->n.left ; i++)
+ {
+ size_t thislen ;
+ if (len < 2) goto errproto ;
+ thislen = m->s[w++] + 1 ; len-- ;
+ if ((thislen > len) || m->s[w + thislen]) goto errproto ;
+ stralloc_catb(r->sa, m->s + w, thislen) ;
+ w += thislen ; len -= thislen ;
+ }
+ return 1 ;
+ errproto:
+ return (errno = EPROTO, 0) ;
+}
+
+int skabus_pub_list_async (skabus_pub_t *a, stralloc *sa, skabus_pub_list_result_t *result)
+{
+ result->sa = sa ;
+ return skaclient_put(&a->connection, "L", 1, &skabus_pub_list_cb, result) ;
+}
diff --git a/src/libskabus/skabus_pub_message_get.c b/src/libskabus/skabus_pub_message_get.c
new file mode 100644
index 0000000..aabefaa
--- /dev/null
+++ b/src/libskabus/skabus_pub_message_get.c
@@ -0,0 +1,32 @@
+/* ISC license. */
+
+#include <string.h>
+
+#include <skalibs/alloc.h>
+#include <skalibs/genalloc.h>
+
+#include <skabus/pub.h>
+#include "skabus-pub-internal.h"
+
+size_t skabus_pub_message_get (skabus_pub_t *a, skabus_pub_msginfo_t *info, int *fd, int *fds)
+{
+ size_t n = genalloc_len(skabus_pub_cltinfo_t, &a->info) ;
+ skabus_pub_cltinfo_t *p = genalloc_s(skabus_pub_cltinfo_t, &a->info) + a->head ;
+ if (!n) return 0 ;
+ *info = p->msginfo ;
+ *fd = p->fd ;
+ if (p->nfds)
+ {
+ for (size_t i = 0 ; i < p->nfds ; i++) fds[i] = p->fds[i] ;
+ alloc_free(p->fds) ;
+ }
+
+ if (++a->head == n || a->head > SKABUS_HEAD_MAX)
+ {
+ n -= a->head ;
+ memmove(a->info.s, a->info.s + a->head * sizeof(skabus_pub_cltinfo_t), n * sizeof(skabus_pub_cltinfo_t)) ;
+ genalloc_setlen(skabus_pub_cltinfo_t, &a->info, n) ;
+ a->head = 0 ;
+ }
+ return 1 + n - a->head ;
+}
diff --git a/src/libskabus/skabus_pub_message_getnfds.c b/src/libskabus/skabus_pub_message_getnfds.c
new file mode 100644
index 0000000..828a42d
--- /dev/null
+++ b/src/libskabus/skabus_pub_message_getnfds.c
@@ -0,0 +1,11 @@
+/* ISC license. */
+
+#include <skalibs/genalloc.h>
+
+#include <skabus/pub.h>
+
+int skabus_pub_message_getnfds (skabus_pub_t const *a)
+{
+ return genalloc_len(skabus_pub_cltinfo_t, &a->info) <= a->head ? -1 :
+ (int)genalloc_s(skabus_pub_cltinfo_t, &a->info)[a->head].nfds ;
+}
diff --git a/src/libskabus/skabus_pub_register.c b/src/libskabus/skabus_pub_register.c
new file mode 100644
index 0000000..30a08a3
--- /dev/null
+++ b/src/libskabus/skabus_pub_register.c
@@ -0,0 +1,15 @@
+/* ISC license. */
+
+#include <errno.h>
+
+#include <skalibs/skaclient.h>
+
+#include <skabus/pub.h>
+
+int skabus_pub_register (skabus_pub_t *a, char const *id, char const *sre, char const *wre, tain_t const *deadline, tain_t *stamp)
+{
+ unsigned char r ;
+ if (!skabus_pub_register_async(a, id, sre, wre, &r)) return 0 ;
+ if (!skaclient_syncify(&a->connection, deadline, stamp)) return 0 ;
+ return r ? (errno = r, 0) : 1 ;
+}
diff --git a/src/libskabus/skabus_pub_register_async.c b/src/libskabus/skabus_pub_register_async.c
new file mode 100644
index 0000000..2736bcc
--- /dev/null
+++ b/src/libskabus/skabus_pub_register_async.c
@@ -0,0 +1,30 @@
+/* ISC license. */
+
+#include <string.h>
+#include <errno.h>
+#include <sys/uio.h>
+
+#include <skalibs/uint32.h>
+#include <skalibs/skaclient.h>
+
+#include <skabus/pub.h>
+
+int skabus_pub_register_async (skabus_pub_t *a, char const *id, char const *sre, char const *wre, unsigned char *err)
+{
+ size_t idlen = strlen(id) ;
+ size_t srelen = strlen(sre) ;
+ size_t wrelen = strlen(wre) ;
+ char pack[10] = "R" ;
+ struct iovec v[4] =
+ {
+ { .iov_base = pack, .iov_len = 10 },
+ { .iov_base = (char *)id, .iov_len = idlen+1 },
+ { .iov_base = (char *)sre, .iov_len = srelen+1 },
+ { .iov_base = (char *)wre, .iov_len = wrelen+1 }
+ } ;
+ if (idlen > SKABUS_PUB_IDSTR_SIZE) return (errno = ERANGE, 0) ;
+ pack[1] = (unsigned char)idlen ;
+ uint32_pack_big(pack+2, srelen) ;
+ uint32_pack_big(pack+6, wrelen) ;
+ return skaclient_putv(&a->connection, v, 4, &skaclient_default_cb, err) ;
+}
diff --git a/src/libskabus/skabus_pub_send.c b/src/libskabus/skabus_pub_send.c
new file mode 100644
index 0000000..f41ede3
--- /dev/null
+++ b/src/libskabus/skabus_pub_send.c
@@ -0,0 +1,15 @@
+/* ISC license. */
+
+#include <errno.h>
+
+#include <skalibs/skaclient.h>
+
+#include <skabus/pub.h>
+
+uint64_t skabus_pub_send_withfds (skabus_pub_t *a, char const *s, size_t len, int const *fds, unsigned int nfds, unsigned char const *bits, tain_t const *deadline, tain_t *stamp)
+{
+ skabus_pub_send_result_t r ;
+ if (!skabus_pub_send_withfds_async(a, s, len, fds, nfds, bits, &r)) return 0 ;
+ if (!skaclient_syncify(&a->connection, deadline, stamp)) return 0 ;
+ return r.err ? (errno = r.err, 0) : r.u ;
+}
diff --git a/src/libskabus/skabus_pub_send_async.c b/src/libskabus/skabus_pub_send_async.c
new file mode 100644
index 0000000..1a2bb58
--- /dev/null
+++ b/src/libskabus/skabus_pub_send_async.c
@@ -0,0 +1,16 @@
+/* ISC license. */
+
+#include <sys/uio.h>
+
+#include <skalibs/unixmessage.h>
+#include <skalibs/skaclient.h>
+
+#include <skabus/pub.h>
+#include "skabus-pub-internal.h"
+
+int skabus_pub_send_withfds_async (skabus_pub_t *a, char const *s, size_t len, int const *fds, unsigned int nfds, unsigned char const *bits, skabus_pub_send_result_t *result)
+{
+ struct iovec v[2] = { { .iov_base = "!", .iov_len = 1 }, { .iov_base = (char *)s, .iov_len = len } } ;
+ unixmessage_v_t m = { .v = v, .vlen = 2, .fds = (int *)fds, .nfds = nfds } ;
+ return skaclient_putmsgv_and_close(&a->connection, &m, bits, &skabus_pub_send_cb, result) ;
+}
diff --git a/src/libskabus/skabus_pub_send_cb.c b/src/libskabus/skabus_pub_send_cb.c
new file mode 100644
index 0000000..e95ab6c
--- /dev/null
+++ b/src/libskabus/skabus_pub_send_cb.c
@@ -0,0 +1,25 @@
+/* ISC license. */
+
+#include <errno.h>
+
+#include <skalibs/posixishard.h>
+#include <skalibs/uint64.h>
+#include <skalibs/unixmessage.h>
+
+#include <skabus/pub.h>
+
+int skabus_pub_send_cb (unixmessage_t const *m, void *p)
+{
+ skabus_pub_send_result_t *r = p ;
+ if (!m->len || m->nfds) return (errno = EPROTO, 0) ;
+ if (m->s[0])
+ {
+ r->u = 0 ;
+ r->err = m->s[0] ;
+ return 1 ;
+ }
+ if (m->len != 9) return (errno = EPROTO, 0) ;
+ uint64_unpack_big(m->s + 1, &r->u) ;
+ r->err = 0 ;
+ return 1 ;
+}
diff --git a/src/libskabus/skabus_pub_sendpm.c b/src/libskabus/skabus_pub_sendpm.c
new file mode 100644
index 0000000..073633a
--- /dev/null
+++ b/src/libskabus/skabus_pub_sendpm.c
@@ -0,0 +1,15 @@
+/* ISC license. */
+
+#include <errno.h>
+
+#include <skalibs/skaclient.h>
+
+#include <skabus/pub.h>
+
+uint64_t skabus_pub_sendpm_withfds (skabus_pub_t *a, char const *id, char const *s, size_t len, int const *fds, unsigned int nfds, unsigned char const *bits, tain_t const *deadline, tain_t *stamp)
+{
+ skabus_pub_send_result_t r ;
+ if (!skabus_pub_sendpm_withfds_async(a, id, s, len, fds, nfds, bits, &r)) return 0 ;
+ if (!skaclient_syncify(&a->connection, deadline, stamp)) return 0 ;
+ return r.err ? (errno = r.err, 0) : r.u ;
+}
diff --git a/src/libskabus/skabus_pub_sendpm_async.c b/src/libskabus/skabus_pub_sendpm_async.c
new file mode 100644
index 0000000..d43b01f
--- /dev/null
+++ b/src/libskabus/skabus_pub_sendpm_async.c
@@ -0,0 +1,27 @@
+/* ISC license. */
+
+#include <string.h>
+#include <sys/uio.h>
+#include <errno.h>
+
+#include <skalibs/unixmessage.h>
+#include <skalibs/skaclient.h>
+
+#include <skabus/pub.h>
+#include "skabus-pub-internal.h"
+
+int skabus_pub_sendpm_withfds_async (skabus_pub_t *a, char const *idstr, char const *s, size_t len, int const *fds, unsigned int nfds, unsigned char const *bits, skabus_pub_send_result_t *result)
+{
+ size_t idlen = strlen(idstr) ;
+ char tmp[2] = "+" ;
+ struct iovec v[3] =
+ {
+ { .iov_base = tmp, .iov_len = 2 },
+ { .iov_base = (char *)idstr, .iov_len = idlen+1 },
+ { .iov_base = (char *)s, .iov_len = len }
+ } ;
+ unixmessage_v_t m = { .v = v, .vlen = 3, .fds = (int *)fds, .nfds = nfds } ;
+ if (idlen > SKABUS_PUB_IDSTR_SIZE) return (errno = ERANGE, 0) ;
+ tmp[1] = (unsigned char)idlen ;
+ return skaclient_putmsgv_and_close(&a->connection, &m, bits, &skabus_pub_send_cb, result) ;
+}
diff --git a/src/libskabus/skabus_pub_sendv.c b/src/libskabus/skabus_pub_sendv.c
new file mode 100644
index 0000000..ff1c743
--- /dev/null
+++ b/src/libskabus/skabus_pub_sendv.c
@@ -0,0 +1,16 @@
+/* ISC license. */
+
+#include <errno.h>
+
+#include <skalibs/uint64.h>
+#include <skalibs/skaclient.h>
+
+#include <skabus/pub.h>
+
+uint64_t skabus_pub_sendv_withfds (skabus_pub_t *a, struct iovec const *v, unsigned int vlen, int const *fds, unsigned int nfds, unsigned char const *bits, tain_t const *deadline, tain_t *stamp)
+{
+ skabus_pub_send_result_t r ;
+ if (!skabus_pub_sendv_withfds_async(a, v, vlen, fds, nfds, bits, &r)) return 0 ;
+ if (!skaclient_syncify(&a->connection, deadline, stamp)) return 0 ;
+ return r.err ? (errno = r.err, 0) : r.u ;
+}
diff --git a/src/libskabus/skabus_pub_sendv_async.c b/src/libskabus/skabus_pub_sendv_async.c
new file mode 100644
index 0000000..54ce94e
--- /dev/null
+++ b/src/libskabus/skabus_pub_sendv_async.c
@@ -0,0 +1,18 @@
+/* ISC license. */
+
+#include <sys/uio.h>
+
+#include <skalibs/unixmessage.h>
+#include <skalibs/skaclient.h>
+
+#include <skabus/pub.h>
+#include "skabus-pub-internal.h"
+
+int skabus_pub_sendv_withfds_async (skabus_pub_t *a, struct iovec const *v, unsigned int vlen, int const *fds, unsigned int nfds, unsigned char const *bits, skabus_pub_send_result_t *result)
+{
+ struct iovec vv[vlen+1] ;
+ unixmessage_v_t m = { .v = vv, .vlen = vlen+1, .fds = (int *)fds, .nfds = nfds } ;
+ vv[0].iov_base = "!" ; vv[0].iov_len = 1 ;
+ for (unsigned int i = 0 ; i < vlen ; i++) vv[1+i] = v[i] ;
+ return skaclient_putmsgv_and_close(&a->connection, &m, bits, &skabus_pub_send_cb, result) ;
+}
diff --git a/src/libskabus/skabus_pub_sendvpm.c b/src/libskabus/skabus_pub_sendvpm.c
new file mode 100644
index 0000000..a898bbc
--- /dev/null
+++ b/src/libskabus/skabus_pub_sendvpm.c
@@ -0,0 +1,15 @@
+/* ISC license. */
+
+#include <errno.h>
+
+#include <skalibs/skaclient.h>
+
+#include <skabus/pub.h>
+
+uint64_t skabus_pub_sendvpm_withfds (skabus_pub_t *a, char const *idstr, struct iovec const *v, unsigned int vlen, int const *fds, unsigned int nfds, unsigned char const *bits, tain_t const *deadline, tain_t *stamp)
+{
+ skabus_pub_send_result_t r ;
+ if (!skabus_pub_sendvpm_withfds_async(a, idstr, v, vlen, fds, nfds, bits, &r)) return 0 ;
+ if (!skaclient_syncify(&a->connection, deadline, stamp)) return 0 ;
+ return r.err ? (errno = r.err, 0) : r.u ;
+}
diff --git a/src/libskabus/skabus_pub_sendvpm_async.c b/src/libskabus/skabus_pub_sendvpm_async.c
new file mode 100644
index 0000000..b1bab4d
--- /dev/null
+++ b/src/libskabus/skabus_pub_sendvpm_async.c
@@ -0,0 +1,25 @@
+/* ISC license. */
+
+#include <string.h>
+#include <sys/uio.h>
+#include <errno.h>
+
+#include <skalibs/unixmessage.h>
+#include <skalibs/skaclient.h>
+
+#include <skabus/pub.h>
+#include "skabus-pub-internal.h"
+
+int skabus_pub_sendvpm_withfds_async (skabus_pub_t *a, char const *idstr, struct iovec const *v, unsigned int vlen, int const *fds, unsigned int nfds, unsigned char const *bits, skabus_pub_send_result_t *result)
+{
+ size_t idlen = strlen(idstr) ;
+ char tmp[2] = "+" ;
+ struct iovec vv[vlen + 2] ;
+ unixmessage_v_t m = { .v = vv, .vlen = vlen + 2, .fds = (int *)fds, .nfds = nfds } ;
+ if (idlen > SKABUS_PUB_IDSTR_SIZE) return (errno = ERANGE, 0) ;
+ tmp[1] = (unsigned char)idlen ;
+ vv[0].iov_base = tmp ; vv[0].iov_len = 2 ;
+ vv[1].iov_base = (char *)idstr ; vv[1].iov_len = idlen+1 ;
+ for (unsigned int i = 0 ; i < vlen ; i++) vv[2+i] = v[i] ;
+ return skaclient_putmsgv_and_close(&a->connection, &m, bits, &skabus_pub_send_cb, result) ;
+}
diff --git a/src/libskabus/skabus_pub_start.c b/src/libskabus/skabus_pub_start.c
new file mode 100644
index 0000000..d72b0f7
--- /dev/null
+++ b/src/libskabus/skabus_pub_start.c
@@ -0,0 +1,20 @@
+/* ISC license. */
+
+#include <skalibs/skaclient.h>
+
+#include <skabus/pub.h>
+
+int skabus_pub_start (skabus_pub_t *a, char const *path, tain_t const *deadline, tain_t *stamp)
+{
+ return skaclient_start_b(
+ &a->connection,
+ &a->buffers,
+ path,
+ SKACLIENT_OPTION_ASYNC_ACCEPT_FDS,
+ SKABUS_PUB_BANNER1,
+ SKABUS_PUB_BANNER1_LEN,
+ SKABUS_PUB_BANNER2,
+ SKABUS_PUB_BANNER2_LEN,
+ deadline,
+ stamp) ;
+}
diff --git a/src/libskabus/skabus_pub_start_async.c b/src/libskabus/skabus_pub_start_async.c
new file mode 100644
index 0000000..3a82f41
--- /dev/null
+++ b/src/libskabus/skabus_pub_start_async.c
@@ -0,0 +1,19 @@
+/* ISC license. */
+
+#include <skalibs/skaclient.h>
+
+#include <skabus/pub.h>
+
+int skabus_pub_start_async (skabus_pub_t *a, char const *path, skabus_pub_start_result_t *data)
+{
+ return skaclient_start_async_b(
+ &a->connection,
+ &a->buffers,
+ path,
+ SKACLIENT_OPTION_ASYNC_ACCEPT_FDS,
+ SKABUS_PUB_BANNER1,
+ SKABUS_PUB_BANNER1_LEN,
+ SKABUS_PUB_BANNER2,
+ SKABUS_PUB_BANNER2_LEN,
+ &data->skaclient_cbdata) ;
+}
diff --git a/src/libskabus/skabus_pub_subunsub.c b/src/libskabus/skabus_pub_subunsub.c
new file mode 100644
index 0000000..29e61f1
--- /dev/null
+++ b/src/libskabus/skabus_pub_subunsub.c
@@ -0,0 +1,15 @@
+/* ISC license. */
+
+#include <errno.h>
+
+#include <skalibs/skaclient.h>
+
+#include <skabus/pub.h>
+
+int skabus_pub_subunsub (skabus_pub_t *a, char what, char const *id, tain_t const *deadline, tain_t *stamp)
+{
+ unsigned char r ;
+ if (!skabus_pub_subunsub_async(a, what, id, &r)) return 0 ;
+ if (!skaclient_syncify(&a->connection, deadline, stamp)) return 0 ;
+ return r ? (errno = r, 0) : 1 ;
+}
diff --git a/src/libskabus/skabus_pub_subunsub_async.c b/src/libskabus/skabus_pub_subunsub_async.c
new file mode 100644
index 0000000..d41ed98
--- /dev/null
+++ b/src/libskabus/skabus_pub_subunsub_async.c
@@ -0,0 +1,22 @@
+/* ISC license. */
+
+#include <string.h>
+#include <sys/uio.h>
+#include <errno.h>
+
+#include <skalibs/skaclient.h>
+
+#include <skabus/pub.h>
+
+int skabus_pub_subunsub_async (skabus_pub_t *a, char what, char const *id, unsigned char *err)
+{
+ size_t idlen = strlen(id) ;
+ char pack[2] = { what, (unsigned char)idlen } ;
+ struct iovec v[2] =
+ {
+ { .iov_base = pack, .iov_len = 2 },
+ { .iov_base = (char *)id, .iov_len = idlen+1 }
+ } ;
+ if (idlen > SKABUS_PUB_IDSTR_SIZE) return (errno = ERANGE, 0) ;
+ return skaclient_putv(&a->connection, v, 2, &skaclient_default_cb, err) ;
+}
diff --git a/src/libskabus/skabus_pub_update.c b/src/libskabus/skabus_pub_update.c
new file mode 100644
index 0000000..46e699c
--- /dev/null
+++ b/src/libskabus/skabus_pub_update.c
@@ -0,0 +1,58 @@
+/* ISC license. */
+
+#include <string.h>
+#include <stdint.h>
+#include <errno.h>
+
+#include <skalibs/posixishard.h>
+#include <skalibs/uint64.h>
+#include <skalibs/tai.h>
+#include <skalibs/alloc.h>
+#include <skalibs/genalloc.h>
+#include <skalibs/unixmessage.h>
+#include <skalibs/skaclient.h>
+
+#include <skabus/pub.h>
+
+static int handler (unixmessage_t const *m, void *x)
+{
+ skabus_pub_t *a = (skabus_pub_t *)x ;
+ size_t n = genalloc_len(skabus_pub_cltinfo_t, &a->info) ;
+ char const *s = m->s ;
+ size_t len = m->len ;
+ skabus_pub_cltinfo_t *p ;
+ uint8_t idlen ;
+ if (len < 11 + TAIN_PACK || !m->nfds) goto errproto ;
+ if (!genalloc_readyplus(skabus_pub_cltinfo_t, &a->info, 1)) goto err ;
+ p = genalloc_s(skabus_pub_cltinfo_t, &a->info) + n ;
+ p->fd = m->fds[0] ;
+ p->nfds = m->nfds - 1 ;
+ if (p->nfds > 1)
+ {
+ p->fds = alloc(p->nfds * sizeof(int)) ;
+ if (!p->fds) goto err ;
+ for (size_t i = 0 ; i < p->nfds ; i++) p->fds[i] = m->fds[1 + i] ;
+ }
+ else p->fds = 0 ;
+ uint64_unpack_big(s, &p->msginfo.serial) ; s += 8 ; len -= 8 ;
+ tain_unpack(s, &p->msginfo.timestamp) ; s += TAIN_PACK ; len -= TAIN_PACK ;
+ p->msginfo.flags = *s++ ; len-- ;
+ idlen = *s++ ; len-- ;
+ if (len != (size_t)idlen + 1 || s[idlen] || idlen > SKABUS_PUB_IDSTR_SIZE) goto errprotof ;
+ memcpy(p->msginfo.sender, s, len) ;
+ genalloc_setlen(skabus_pub_cltinfo_t, &a->info, n+1) ;
+ return 1 ;
+
+ errprotof:
+ alloc_free(p->fds) ;
+ errproto:
+ errno = EPROTO ;
+ err:
+ unixmessage_drop(m) ;
+ return 0 ;
+}
+
+int skabus_pub_update (skabus_pub_t *a)
+{
+ return skaclient_update(&a->connection, &handler, a) ;
+}
diff --git a/src/pub/deps-exe/skabus-pub-daemon b/src/pub/deps-exe/skabus-pub-daemon
new file mode 100755
index 0000000..e7187fe
--- /dev/null
+++ b/src/pub/deps-exe/skabus-pub-daemon
@@ -0,0 +1 @@
+-lskarnet
diff --git a/src/pub/deps-exe/skabus-pubd b/src/pub/deps-exe/skabus-pubd
new file mode 100755
index 0000000..ac0cace
--- /dev/null
+++ b/src/pub/deps-exe/skabus-pubd
@@ -0,0 +1,4 @@
+-ls6
+-lskarnet
+${SOCKET_LIB}
+${TAINNOW_LIB}
diff --git a/src/pub/skabus-pub-daemon.c b/src/pub/skabus-pub-daemon.c
new file mode 100644
index 0000000..54b4359
--- /dev/null
+++ b/src/pub/skabus-pub-daemon.c
@@ -0,0 +1,162 @@
+/* ISC license. */
+
+#include <sys/types.h>
+#include <limits.h>
+
+#include <skalibs/types.h>
+#include <skalibs/sgetopt.h>
+#include <skalibs/strerr2.h>
+#include <skalibs/djbunix.h>
+
+#include <s6/config.h>
+
+#include <skabus/config.h>
+
+#define USAGE "skabus-pub-daemon [ -v verbosity ] [ -d | -D ] [ -1 ] [ -c maxconn ] [ -b backlog ] [ -G gid,gid,... ] [ -g gid ] [ -u uid ] [ -U ] [ -t timeout ] [ -T lameducktimeout ] [ -i rulesdir | -x rulesfile ] [ -S | -s ] [ -k announcere ] path msgfsdir"
+#define dieusage() strerr_dieusage(100, USAGE)
+
+int main (int argc, char const *const *argv, char const *const *envp)
+{
+ unsigned int verbosity = 1 ;
+ int flag1 = 0 ;
+ int flagU = 0 ;
+ int flagreuse = 1 ;
+ uid_t uid = 0 ;
+ gid_t gid = 0 ;
+ gid_t gids[NGROUPS_MAX] ;
+ size_t gidn = (unsigned int)-1 ;
+ unsigned int maxconn = 0 ;
+ unsigned int backlog = (unsigned int)-1 ;
+ int flagpublic = 0 ;
+ unsigned int timeout = 0 ;
+ unsigned int ltimeout = 0 ;
+ char const *rulesdir = 0 ;
+ char const *rulesfile = 0 ;
+ char const *announcere = 0 ;
+ PROG = "skabus-pub-daemon" ;
+ {
+ subgetopt_t l = SUBGETOPT_ZERO ;
+ for (;;)
+ {
+ int opt = subgetopt_r(argc, argv, "Dd1USsv:c:b:u:g:G:t:T:i:x:k:", &l) ;
+ if (opt == -1) break ;
+ switch (opt)
+ {
+ case 'D' : flagreuse = 0 ; break ;
+ case 'd' : flagreuse = 1 ; break ;
+ case '1' : flag1 = 1 ; break ;
+ case 'v' : if (!uint0_scan(l.arg, &verbosity)) dieusage() ; break ;
+ case 'c' : if (!uint0_scan(l.arg, &maxconn)) dieusage() ; if (!maxconn) maxconn = 1 ; break ;
+ case 'b' : if (!uint0_scan(l.arg, &backlog)) dieusage() ; break ;
+ case 'u' : if (!uid0_scan(l.arg, &uid)) dieusage() ; break ;
+ case 'g' : if (!gid0_scan(l.arg, &gid)) dieusage() ; break ;
+ case 'G' : if (!gid_scanlist(gids, NGROUPS_MAX, l.arg, &gidn) && *l.arg) dieusage() ; break ;
+ case 'U' : flagU = 1 ; uid = 0 ; gid = 0 ; gidn = (size_t)-1 ; break ;
+ case 'S' : flagpublic = 0 ; break ;
+ case 's' : flagpublic = 1 ; break ;
+ case 't' : if (!uint0_scan(l.arg, &timeout)) dieusage() ; break ;
+ case 'T' : if (!uint0_scan(l.arg, &ltimeout)) dieusage() ; break ;
+ case 'i' : rulesdir = l.arg ; rulesfile = 0 ; break ;
+ case 'x' : rulesfile = l.arg ; rulesdir = 0 ; break ;
+ case 'k' : announcere = l.arg ; break ;
+ default : dieusage() ;
+ }
+ }
+ argc -= l.ind ; argv += l.ind ;
+ if (argc < 2) dieusage() ;
+ }
+
+ {
+ unsigned int m = 0, pos = 0 ;
+ char const *newargv[33] ;
+ char fmt[UINT_FMT * 4 + UID_FMT + GID_FMT * (1 + NGROUPS_MAX)] ;
+ newargv[m++] = S6_EXTBINPREFIX "s6-ipcserver-socketbinder" ;
+ if (!flagreuse) newargv[m++] = "-D" ;
+ if (backlog != (unsigned int)-1)
+ {
+ newargv[m++] = "-b" ;
+ newargv[m++] = fmt + pos ;
+ pos += uint_fmt(fmt + pos, backlog) ;
+ fmt[pos++] = 0 ;
+ }
+ newargv[m++] = "--" ;
+ newargv[m++] = *argv++ ;
+ if (flagU || uid || gid || gidn != (unsigned int)-1)
+ {
+ newargv[m++] = S6_EXTBINPREFIX "s6-applyuidgid" ;
+ if (flagU) newargv[m++] = "-Uz" ;
+ if (uid)
+ {
+ newargv[m++] = "-u" ;
+ newargv[m++] = fmt + pos ;
+ pos += uid_fmt(fmt + pos, uid) ;
+ fmt[pos++] = 0 ;
+ }
+ if (gid)
+ {
+ newargv[m++] = "-g" ;
+ newargv[m++] = fmt + pos ;
+ pos += gid_fmt(fmt + pos, gid) ;
+ fmt[pos++] = 0 ;
+ }
+ if (gidn != (unsigned int)-1)
+ {
+ newargv[m++] = "-G" ;
+ newargv[m++] = fmt + pos ;
+ pos += gid_fmtlist(fmt + pos, gids, gidn) ;
+ fmt[pos++] = 0 ;
+ }
+ newargv[m++] = "--" ;
+ }
+ newargv[m++] = SKABUS_BINPREFIX "skabus-pubd" ;
+ if (verbosity != 1)
+ {
+ newargv[m++] = "-v" ;
+ newargv[m++] = fmt + pos ;
+ pos += uint_fmt(fmt + pos, verbosity) ;
+ fmt[pos++] = 0 ;
+ }
+ if (flag1) newargv[m++] = "-1" ;
+ if (flagpublic) newargv[m++] = "-s" ;
+ if (maxconn)
+ {
+ newargv[m++] = "-c" ;
+ newargv[m++] = fmt + pos ;
+ pos += uint_fmt(fmt + pos, maxconn) ;
+ fmt[pos++] = 0 ;
+ }
+ if (timeout)
+ {
+ newargv[m++] = "-t" ;
+ newargv[m++] = fmt + pos ;
+ pos += uint_fmt(fmt + pos, timeout) ;
+ fmt[pos++] = 0 ;
+ }
+ if (ltimeout)
+ {
+ newargv[m++] = "-T" ;
+ newargv[m++] = fmt + pos ;
+ pos += uint_fmt(fmt + pos, ltimeout) ;
+ fmt[pos++] = 0 ;
+ }
+ if (rulesdir)
+ {
+ newargv[m++] = "-i" ;
+ newargv[m++] = rulesdir ;
+ }
+ else if (rulesfile)
+ {
+ newargv[m++] = "-x" ;
+ newargv[m++] = rulesfile ;
+ }
+ if (announcere)
+ {
+ newargv[m++] = "-k" ;
+ newargv[m++] = announcere ;
+ }
+ newargv[m++] = "--" ;
+ newargv[m++] = *argv++ ;
+ newargv[m++] = 0 ;
+ xpathexec_run(newargv[0], newargv, envp) ;
+ }
+}
diff --git a/src/pub/skabus-pubd.c b/src/pub/skabus-pubd.c
new file mode 100644
index 0000000..99a6ddf
--- /dev/null
+++ b/src/pub/skabus-pubd.c
@@ -0,0 +1,964 @@
+/* ISC license. */
+
+#include <stdint.h>
+#include <unistd.h>
+#include <sys/stat.h>
+#include <sys/uio.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <signal.h>
+#include <stdlib.h>
+#include <regex.h>
+
+#include <skalibs/posixishard.h>
+#include <skalibs/uint32.h>
+#include <skalibs/uint64.h>
+#include <skalibs/types.h>
+#include <skalibs/allreadwrite.h>
+#include <skalibs/bytestr.h>
+#include <skalibs/sgetopt.h>
+#include <skalibs/env.h>
+#include <skalibs/error.h>
+#include <skalibs/strerr2.h>
+#include <skalibs/stralloc.h>
+#include <skalibs/tai.h>
+#include <skalibs/djbunix.h>
+#include <skalibs/sig.h>
+#include <skalibs/iopause.h>
+#include <skalibs/selfpipe.h>
+#include <skalibs/cdb.h>
+#include <skalibs/webipc.h>
+#include <skalibs/genset.h>
+#include <skalibs/avltree.h>
+#include <skalibs/avltreen.h>
+#include <skalibs/unixmessage.h>
+#include <skalibs/unixconnection.h>
+#include <skalibs/skaclient.h>
+
+#include <s6/accessrules.h>
+
+#include <skabus/pub.h>
+
+#define USAGE "skabus-pubd [ -v verbosity ] [ -1 ] [ -c maxconn ] [ -t timeout ] [ -T lameducktimeout ] [ -i rulesdir | -x rulesfile ] [ -S | -s ] [ -k controlre ] msgfsdir"
+#define dieusage() strerr_dieusage(100, USAGE) ;
+#define dienomem() strerr_diefu1sys(111, "stralloc_catb") ;
+#define die() strerr_dief1sys(101, "unexpected error") ;
+
+#define MSGINFO_PACK(n) (11 + TAIN_PACK + (n))
+
+static unsigned int verbosity = 1 ;
+static int cont = 1 ;
+static uint64_t serial = 1 ;
+static tain_t answertto = TAIN_INFINITE_RELATIVE ;
+static tain_t lameduckdeadline = TAIN_INFINITE_RELATIVE ;
+static int flagidstrpub = 0 ;
+
+static unsigned int rulestype = 0 ;
+static char const *rules = 0 ;
+static int cdbfd = -1 ;
+static struct cdb cdbmap = CDB_ZERO ;
+
+static char const *msgfsdir ;
+
+static void handle_signals (void)
+{
+ for (;;) switch (selfpipe_read())
+ {
+ case -1 : strerr_diefu1sys(111, "selfpipe_read()") ;
+ case 0 : return ;
+ case SIGTERM :
+ {
+ if (cont)
+ {
+ cont = 0 ;
+ tain_add_g(&lameduckdeadline, &lameduckdeadline) ;
+ }
+ break ;
+ }
+ case SIGHUP :
+ {
+ int fd ;
+ struct cdb c = CDB_ZERO ;
+ if (rulestype != 2) break ;
+ fd = open_readb(rules) ;
+ if (fd < 0) break ;
+ if (cdb_init(&c, fd) < 0)
+ {
+ fd_close(fd) ;
+ break ;
+ }
+ cdb_free(&cdbmap) ;
+ fd_close(cdbfd) ;
+ cdbfd = fd ;
+ cdbmap = c ;
+ }
+ break ;
+ default : break ;
+ }
+}
+
+static void *uint32_dtok (uint32_t d, void *x)
+{
+ (void)x ;
+ return (void *)(uintptr_t)d ;
+}
+
+static int ptr_cmp (void const *a, void const *b, void *x)
+{
+ (void)x ;
+ return a < b ? -1 : a > b ;
+}
+
+
+ /* fd reference counter */
+
+typedef struct fdcount_s fdcount_t, *fdcount_t_ref ;
+struct fdcount_s
+{
+ int fd ;
+ uint32_t n ;
+} ;
+
+static void *fdcount_dtok (uint32_t d, void *x)
+{
+ return &GENSETDYN_P(fdcount_t, (gensetdyn *)x, d)->fd ;
+}
+
+static int fd_cmp (void const *a, void const *b, void *x)
+{
+ int fda = *(int *)a ;
+ int fdb = *(int *)b ;
+ (void)x ;
+ return fda < fdb ? -1 : fda > fdb ;
+}
+
+static gensetdyn fdcountblob = GENSETDYN_INIT(fdcount_t, 3, 3, 8) ;
+static avltree fdcountmap = AVLTREE_INIT(3, 3, 8, &fdcount_dtok, &fd_cmp, &fdcountblob) ;
+#define FDCOUNT(i) GENSETDYN_P(fdcount_t, &fdcountblob, (i))
+
+static inline fdcount_t *fdcount_search (int fd)
+{
+ uint32_t d ;
+ fdcount_t *p ;
+ if (avltree_search(&fdcountmap, &fd, &d)) return FDCOUNT(d) ;
+ if (!gensetdyn_new(&fdcountblob, &d)) dienomem() ;
+ p = FDCOUNT(d) ;
+ p->fd = fd ;
+ p->n = 0 ;
+ if (!avltree_insert(&fdcountmap, d)) dienomem() ;
+ return p ;
+}
+
+static void fdcount_closecb (int fd, void *p)
+{
+ uint32_t d ;
+ avltree_search(&fdcountmap, &fd, &d) ;
+ if (!--FDCOUNT(d)->n) fd_close(fd) ;
+}
+
+
+ /* client */
+
+typedef struct client_s client_t, *client_t_ref ;
+struct client_s
+{
+ uint32_t next ;
+ uint32_t xindexsync ;
+ uint32_t xindexasync ;
+ tain_t deadline ;
+ regex_t idstr_re ;
+ regex_t subscribe_re ;
+ regex_t write_re ;
+ avltree subscribers ;
+ unixmessage_sender_t asyncout ;
+ unixconnection_t sync ;
+ char idstr[SKABUS_PUB_IDSTR_SIZE + 1] ;
+} ;
+
+static genset *clients ;
+static unsigned int sentinel ;
+static avltreen *clientmap ;
+#define CLIENT(i) genset_p(client_t, clients, (i))
+#define numconn (genset_n(clients) - 1)
+
+static inline void client_free (client_t *c)
+{
+ fd_close(unixmessage_sender_fd(&c->sync.out)) ;
+ if (unixmessage_sender_fd(&c->asyncout) >= 0)
+ fd_close(unixmessage_sender_fd(&c->asyncout)) ;
+ unixconnection_free(&c->sync) ;
+ unixmessage_sender_free(&c->asyncout) ;
+ if (c->idstr[0])
+ {
+ regfree(&c->subscribe_re) ;
+ regfree(&c->write_re) ;
+ }
+ else regfree(&c->idstr_re) ;
+ avltree_free(&c->subscribers) ;
+}
+
+static void *idstr_dtok (uint32_t d, void *x)
+{
+ (void)x ;
+ return CLIENT(d)->idstr ;
+}
+
+static int idstr_cmp (void const *a, void const *b, void *x)
+{
+ (void)x ;
+ return strcmp((char const *)a, (char const *)b) ;
+}
+
+static inline void client_delete (uint32_t cc, uint32_t prev)
+{
+ client_t *c = CLIENT(cc) ;
+ uint32_t i = CLIENT(sentinel)->next ;
+ while (i != sentinel)
+ {
+ client_t *p = CLIENT(i) ;
+ avltree_delete(&p->subscribers, (void const *)(uintptr_t)cc) ;
+ i = p->next ;
+ }
+ CLIENT(prev)->next = c->next ;
+ if (c->idstr[0]) avltreen_delete(clientmap, c->idstr) ;
+ client_free(c) ;
+ genset_delete(clients, cc) ;
+}
+
+static void remove (uint32_t *i, uint32_t j)
+{
+ client_delete(*i, j) ;
+ *i = j ;
+}
+
+static void client_setdeadline (client_t *c)
+{
+ tain_t blah ;
+ tain_half(&blah, &tain_infinite_relative) ;
+ tain_add_g(&blah, &blah) ;
+ if (tain_less(&blah, &c->deadline))
+ tain_add_g(&c->deadline, &answertto) ;
+}
+
+static inline int client_prepare_iopause (uint32_t i, tain_t *deadline, iopause_fd *x, uint32_t *j)
+{
+ client_t *c = CLIENT(i) ;
+ if (tain_less(&c->deadline, deadline)) *deadline = c->deadline ;
+ if (!unixmessage_sender_isempty(&c->sync.out) || !unixmessage_receiver_isempty(&c->sync.in) || (cont && !unixmessage_receiver_isfull(&c->sync.in)))
+ {
+ x[*j].fd = unixmessage_sender_fd(&c->sync.out) ;
+ x[*j].events = (!unixmessage_receiver_isempty(&c->sync.in) || (cont && !unixmessage_receiver_isfull(&c->sync.in)) ? IOPAUSE_READ : 0) | (!unixmessage_sender_isempty(&c->sync.out) ? IOPAUSE_WRITE : 0) ;
+ c->xindexsync = (*j)++ ;
+ }
+ else c->xindexsync = 0 ;
+ if (!unixmessage_sender_isempty(&c->asyncout))
+ {
+ x[*j].fd = unixmessage_sender_fd(&c->asyncout) ;
+ x[*j].events = IOPAUSE_WRITE ;
+ c->xindexasync = (*j)++ ;
+ }
+ else c->xindexasync = 0 ;
+ return c->xindexsync || c->xindexasync ;
+}
+
+static inline void client_add (int fd, regex_t const *idstr_re, unsigned int flags)
+{
+ uint32_t i = genset_new(clients) ;
+ client_t *c = CLIENT(i) ;
+ unixconnection_init(&c->sync, fd, fd) ;
+ unixmessage_sender_init_withclosecb(&c->asyncout, -(int)flags - 1, &fdcount_closecb, 0) ;
+ c->idstr[0] = 0 ;
+ c->idstr_re = *idstr_re ;
+ avltree_init(&c->subscribers, 3, 3, 8, &uint32_dtok, &ptr_cmp, 0) ;
+ tain_add_g(&c->deadline, &answertto) ;
+ c->next = CLIENT(sentinel)->next ;
+ CLIENT(sentinel)->next = i ;
+}
+
+static int enqueue_message (uint32_t dd, unixmessage_t const *m)
+{
+ client_t *d = CLIENT(dd) ;
+ if (!unixmessage_put_and_close(&d->asyncout, m, unixmessage_bits_closeall)) return 0 ;
+ client_setdeadline(d) ;
+ for (unsigned int i = 0 ; i < m->nfds ; i++)
+ fdcount_search(m->fds[i])->n++ ;
+ return 1 ;
+}
+
+static int unsendmessage_iter (uint32_t dd, unsigned int h, void *data)
+{
+ unixmessage_unput(&CLIENT(dd)->asyncout) ;
+ (void)h ;
+ (void)data ;
+ return 1 ;
+}
+
+static int sendmessage_iter (uint32_t dd, unsigned int h, void *data)
+{
+ (void)h ;
+ return enqueue_message(dd, (unixmessage_t *)data) ;
+}
+
+static int store_text (char const *s, size_t len)
+{
+ int fd ;
+ size_t msgfsdirlen = strlen(msgfsdir) ;
+ char fn[msgfsdirlen + TAIN_PACK + 10] ;
+ fn[0] = '@' ;
+ memcpy(fn + 1, msgfsdir, msgfsdirlen) ;
+ fn[1 + msgfsdirlen] = '/' ;
+ tain_pack(fn + msgfsdirlen + 2, &STAMP) ;
+ memcpy(fn + msgfsdirlen + 2 + TAIN_PACK, ":XXXXXX", 8) ;
+ fd = mkstemp(fn) ;
+ if (fd < 0) return fd ;
+ if (!writenclose_unsafe(fd, s, len))
+ {
+ fd_close(fd) ;
+ return -1 ;
+ }
+ fd_close(fd) ;
+ fd = open(fn, O_RDONLY) ; /* too bad you can't just lose writing rights */
+ unlink_void(fn) ; /* still there as long as fd is open */
+ return fd ;
+}
+
+static void fill_msginfo (char *pack, char const *idstr, size_t idlen, uint8_t flags)
+{
+ uint64_pack_big(pack, serial++) ;
+ tain_pack(pack + 8, &STAMP) ;
+ pack[8 + TAIN_PACK] = flags ;
+ pack[9 + TAIN_PACK] = idlen ;
+ memcpy(pack + 10 + TAIN_PACK, idstr, idlen + 1) ;
+}
+
+static int announce (char const *what, size_t whatlen, char const *idstr)
+{
+ size_t idlen = strlen(idstr) ;
+ int fd ;
+ char s[MSGINFO_PACK(idlen)] ;
+ unixmessage_t m = { .s = s, .len = MSGINFO_PACK(idlen), .fds = &fd, .nfds = 1 } ;
+ if (!*idstr) return 1 ;
+ fill_msginfo(s, idstr, idlen, 0) ;
+ fd = store_text(what, whatlen) ;
+ if (fd < 0) return 0 ;
+ if (!avltree_iter_withcancel(&CLIENT(sentinel)->subscribers, &sendmessage_iter, &unsendmessage_iter, &m)) return 0 ;
+ return 1 ;
+}
+
+static inline int client_flush (uint32_t i, iopause_fd const *x)
+{
+ client_t *c = CLIENT(i) ;
+ int isflushed = 2 ;
+ if (c->xindexsync && (x[c->xindexsync].revents & IOPAUSE_WRITE))
+ {
+ if (!unixmessage_sender_flush(&c->sync.out))
+ if (error_isagain(errno)) isflushed = 0 ;
+ else
+ {
+ char what[2] = "-" ;
+ what[1] = errno ;
+ if (verbosity) strerr_warnwu2sys("unixmessage_sender_flush ", c->idstr) ;
+ if (!announce(what, 2, c->idstr)) dienomem() ;
+ return 0 ;
+ }
+ else isflushed = 1 ;
+ }
+
+ if (c->xindexasync && (x[c->xindexasync].revents & IOPAUSE_WRITE))
+ {
+ if (!unixmessage_sender_flush(&c->asyncout))
+ if (error_isagain(errno)) isflushed = 0 ;
+ else
+ {
+ char what[2] = "-" ;
+ what[1] = errno ;
+ if (verbosity) strerr_warnwu2sys("unixmessage_sender_flush ", c->idstr) ;
+ if (!announce(what, 2, c->idstr)) dienomem() ;
+ return 0 ;
+ }
+ else isflushed = !!isflushed ;
+ }
+
+ if (isflushed == 1) tain_add_g(&c->deadline, &tain_infinite_relative) ;
+ return 1 ;
+}
+
+static int answer (client_t *c, char e)
+{
+ unixmessage_t m = { .s = &e, .len = 1, .fds = 0, .nfds = 0 } ;
+ if (!unixmessage_put(&c->sync.out, &m)) return 0 ;
+ client_setdeadline(c) ;
+ return 1 ;
+}
+
+static int do_register (uint32_t cc, unixmessage_t const *m)
+{
+ uint32_t srelen, wrelen, dummy ;
+ client_t *c = CLIENT(cc) ;
+ char const *s = m->s ;
+ size_t len = m->len ;
+ int r ;
+ size_t idlen ;
+ if (len < 12 || m->nfds) return (errno = EPROTO, 0) ;
+ idlen = (unsigned char)*s++ ; len-- ;
+ uint32_unpack_big(s, &srelen) ; s += 4 ; len -= 4 ;
+ uint32_unpack_big(s, &wrelen) ; s += 4 ; len -= 4 ;
+ if (len != idlen + srelen + wrelen + 3 || s[idlen] || s[idlen + srelen + 1] || s[idlen + srelen + wrelen + 2] || !idlen || !s[idlen-1]) return (errno = EPROTO, 0) ;
+ if (idlen > SKABUS_PUB_IDSTR_SIZE) return answer(c, ENAMETOOLONG) ;
+ if (c->idstr[0]) return answer(c, EISCONN) ;
+ if (regexec(&c->idstr_re, s, 0, 0, 0)) return answer(c, EPERM) ;
+ if (avltreen_search(clientmap, s, &dummy)) return answer(c, EBUSY) ;
+ r = regcomp(&c->subscribe_re, s + idlen + 1, REG_EXTENDED | REG_NOSUB) ;
+ if (r) return answer(c, r == REG_ESPACE ? ENOMEM : EINVAL) ;
+ r = regcomp(&c->write_re, s + idlen + srelen + 2, REG_EXTENDED | REG_NOSUB) ;
+ if (r)
+ {
+ regfree(&c->subscribe_re) ;
+ return answer(c, r == REG_ESPACE ? ENOMEM : EINVAL) ;
+ }
+ memcpy(c->idstr, s, idlen+1) ;
+ avltreen_insert(clientmap, cc) ;
+ if (!announce("+", 1, s))
+ {
+ char e = errno ;
+ regfree(&c->write_re) ;
+ regfree(&c->subscribe_re) ;
+ avltreen_delete(clientmap, c->idstr) ;
+ return answer(c, e) ;
+ }
+ regfree(&c->idstr_re) ;
+ return answer(c, 0) ;
+}
+
+static int do_list (uint32_t cc, unixmessage_t const *m)
+{
+ client_t *c = CLIENT(cc) ;
+ uint32_t dd = CLIENT(sentinel)->next ;
+ if (m->len || m->nfds) return (errno = EPROTO, 0) ;
+ unsigned int n = avltreen_len(clientmap) - 1 ;
+ {
+ char pack[9] = "" ;
+ char lens[n] ;
+ struct iovec v[1+(n<<1)] ;
+ unixmessage_v_t mreply = { .v = v, .vlen = 1+(n<<1), .fds = 0, .nfds = 0 } ;
+ unsigned int registered = 0 ;
+ v[0].iov_base = pack ; v[0].iov_len = 9 ;
+ for (unsigned int i = 0 ; i < n ; i++)
+ {
+ client_t *d = CLIENT(dd) ;
+ if (d->idstr[0])
+ {
+ size_t len = strlen(d->idstr) ;
+ lens[i] = (unsigned char)len ;
+ v[1+(i<<1)].iov_base = lens + i ;
+ v[1+(i<<1)].iov_len = 1 ;
+ v[2+(i<<1)].iov_base = d->idstr ;
+ v[2+(i<<1)].iov_len = len+1 ;
+ registered++ ;
+ }
+ dd = d->next ;
+ }
+ uint32_pack_big(pack+1, registered) ;
+ uint32_pack_big(pack+5, n - registered) ;
+ if (!unixmessage_putv(&c->sync.out, &mreply)) return answer(c, errno) ;
+ }
+ client_setdeadline(c) ;
+ return 1 ;
+}
+
+static int do_unsubscribe (uint32_t cc, unixmessage_t const *m)
+{
+ uint32_t dd ;
+ char const *s = m->s ;
+ size_t len = m->len ;
+ client_t *c = CLIENT(cc) ;
+ if (!len-- || m->nfds) return (errno = EPROTO, 0) ;
+ if (len != (unsigned int)(unsigned char)s[0] + 1 || s[len]) return (errno = EPROTO, 0) ;
+ s++ ;
+ if (!c->idstr[0]) return answer(c, EPERM) ;
+ if (!avltreen_search(clientmap, s, &dd)) return answer(c, errno) ;
+ if (!avltree_delete(&CLIENT(dd)->subscribers, (void const *)(uintptr_t)cc)) return answer(c, errno) ;
+ return answer(c, 0) ;
+}
+
+static int do_subscribe (uint32_t cc, unixmessage_t const *m)
+{
+ client_t *c = CLIENT(cc) ;
+ char const *s = m->s ;
+ size_t len = m->len ;
+ client_t *d ;
+ uint32_t dd, dummy ;
+ if (!len-- || m->nfds) return (errno = EPROTO, 0) ;
+ if (len != (unsigned int)(unsigned char)s[0] + 1 || s[len]) return (errno = EPROTO, 0) ;
+ s++ ;
+ if (!c->idstr[0]) return answer(c, EPERM) ;
+ if (!avltreen_search(clientmap, s, &dd)) return answer(c, errno) ;
+ d = CLIENT(dd) ;
+ if (regexec(&d->subscribe_re, c->idstr, 0, 0, 0)) return answer(c, EPERM) ;
+ if (!avltree_search(&d->subscribers, c->idstr, &dummy))
+ {
+ if (!avltree_insert(&d->subscribers, cc)) return answer(c, errno) ;
+ }
+ return answer(c, 0) ;
+}
+
+static int do_sendpm (uint32_t cc, unixmessage_t const *m)
+{
+ client_t *c = CLIENT(cc) ;
+ char const *s = m->s ;
+ size_t len = m->len ;
+ size_t idlen ;
+ uint32_t dd ;
+ client_t *d ;
+ if (len < 2) return (errno = EPROTO, 0) ;
+ idlen = (unsigned char)s[0] ; s++ ; len-- ;
+ if (len < idlen + 1 || s[idlen]) return (errno = EPROTO, 0) ;
+ if (idlen > SKABUS_PUB_IDSTR_SIZE)
+ {
+ unixmessage_drop(m) ;
+ return answer(c, ENAMETOOLONG) ;
+ }
+ if (m->nfds > UNIXMESSAGE_MAXFDS - 1)
+ {
+ unixmessage_drop(m) ;
+ return answer(c, ENFILE) ;
+ }
+ if (!c->idstr[0])
+ {
+ unixmessage_drop(m) ;
+ return answer(c, EPERM) ;
+ }
+ if (!avltreen_search(clientmap, s, &dd))
+ {
+ unixmessage_drop(m) ;
+ return answer(c, errno) ;
+ }
+ s += idlen+1 ; len -= idlen+1 ;
+ d = CLIENT(dd) ;
+ if (regexec(&d->write_re, c->idstr, 0, 0, 0))
+ {
+ unixmessage_drop(m) ;
+ return answer(c, EPERM) ;
+ }
+
+ {
+ size_t cidlen = strlen(c->idstr) ;
+ char pack[1 + MSGINFO_PACK(cidlen)] ;
+ int fds[1 + m->nfds] ;
+ unixmessage_t mtosend = { .s = pack + 1, .len = MSGINFO_PACK(cidlen), .fds = fds, .nfds = 1 + m->nfds } ;
+ fds[0] = store_text(s, len) ;
+ if (fds[0] < 0)
+ {
+ unixmessage_drop(m) ;
+ return answer(c, errno) ;
+ }
+ fill_msginfo(pack + 1, c->idstr, cidlen, 1) ;
+ for (unsigned int i = 0 ; i < m->nfds ; i++) fds[1+i] = m->fds[i] ;
+ if (!unixmessage_put_and_close(&d->asyncout, &mtosend, unixmessage_bits_closeall))
+ {
+ unixmessage_drop(m) ;
+ return answer(c, errno) ;
+ }
+ client_setdeadline(d) ;
+ pack[0] = 0 ;
+ mtosend.s = pack ;
+ mtosend.len = 9 ;
+ mtosend.fds = 0 ;
+ mtosend.nfds = 0 ;
+ return enqueue_message(cc, &mtosend) ;
+ }
+}
+
+static int do_send (uint32_t cc, unixmessage_t const *m)
+{
+ client_t *c = CLIENT(cc) ;
+ size_t cidlen = strlen(c->idstr) ;
+ if (m->nfds > UNIXMESSAGE_MAXFDS - 1)
+ {
+ unixmessage_drop(m) ;
+ return answer(c, ENFILE) ;
+ }
+ {
+ char pack[1 + MSGINFO_PACK(cidlen)] ;
+ int fds[1 + m->nfds] ;
+ unixmessage_t mtosend = { .s = pack + 1, .len = MSGINFO_PACK(cidlen), .fds = fds, .nfds = 1 + m->nfds } ;
+ fds[0] = store_text(m->s, m->len) ;
+ if (fds[0] < 0)
+ {
+ unixmessage_drop(m) ;
+ return answer(c, errno) ;
+ }
+ fill_msginfo(pack + 1, c->idstr, cidlen, 0) ;
+ for (unsigned int i = 0 ; i < m->nfds ; i++) fds[1+i] = m->fds[i] ;
+ if (!avltree_iter_withcancel(&c->subscribers, &sendmessage_iter, &unsendmessage_iter, &mtosend))
+ {
+ unixmessage_drop(m) ;
+ return answer(c, errno) ;
+ }
+ pack[0] = 0 ;
+ mtosend.s = pack ;
+ mtosend.len = 9 ;
+ mtosend.fds = 0 ;
+ mtosend.nfds = 0 ;
+ return enqueue_message(cc, &mtosend) ;
+ }
+}
+
+static int do_error (uint32_t cc, unixmessage_t const *m)
+{
+ (void)cc ;
+ (void)m ;
+ return (errno = EPROTO, 0) ;
+}
+
+typedef int parsefunc_t (uint32_t, unixmessage_t const *) ;
+typedef parsefunc_t *parsefunc_t_ref ;
+
+static inline int parse_protocol (unixmessage_t const *m, void *p)
+{
+ static parsefunc_t_ref const f[7] =
+ {
+ &do_send,
+ &do_sendpm,
+ &do_subscribe,
+ &do_unsubscribe,
+ &do_list,
+ &do_register,
+ &do_error
+ } ;
+ unixmessage_t mcopy = { .s = m->s + 1, .len = m->len - 1, .fds = m->fds, .nfds = m->nfds } ;
+ if (!m->len)
+ {
+ unixmessage_drop(m) ;
+ return (errno = EPROTO, 0) ;
+ }
+ if (!(*f[byte_chr("!+SULR", 6, m->s[0])])(*(uint32_t *)p, &mcopy))
+ {
+ unixmessage_drop(m) ;
+ return 0 ;
+ }
+ return 1 ;
+}
+
+static inline int client_read (uint32_t cc, iopause_fd const *x)
+{
+ client_t *c = CLIENT(cc) ;
+ if (!unixmessage_receiver_isempty(&c->sync.in) || (c->xindexsync && (x[c->xindexsync].revents & IOPAUSE_READ)))
+ {
+ if (unixmessage_sender_fd(&c->asyncout) < 0)
+ {
+ unixmessage_t m ;
+ int r = unixmessage_receive(&c->sync.in, &m) ;
+ if (r < 0) return -1 ;
+ if (r)
+ {
+ unsigned int flags = -(unixmessage_sender_fd(&c->asyncout) + 1) ;
+ if (!skaclient_server_ack(&m, &c->sync.out, &c->asyncout, SKABUS_PUB_BANNER1, SKABUS_PUB_BANNER1_LEN, SKABUS_PUB_BANNER2, SKABUS_PUB_BANNER2_LEN))
+ {
+ unixmessage_drop(&m) ;
+ return -1 ;
+ }
+ if (!(flags & 1)) unixmessage_receiver_refuse_fds(&c->sync.in) ;
+ }
+ }
+ else
+ {
+ int r = unixmessage_handle(&c->sync.in, &parse_protocol, &cc) ;
+ if (r <= 0) return r ;
+ }
+ }
+ return 1 ;
+}
+
+
+ /* Environment on new connections */
+
+static int makere (regex_t *re, char const *s, char const *var)
+{
+ size_t varlen = strlen(var) ;
+ if (str_start(s, var) && (s[varlen] == '='))
+ {
+ int r = regcomp(re, s + varlen + 1, REG_EXTENDED | REG_NOSUB) ;
+ if (r)
+ {
+ if (verbosity)
+ {
+ char buf[256] ;
+ regerror(r, re, buf, 256) ;
+ strerr_warnw6x("invalid ", var, " value: ", s + varlen + 1, ": ", buf) ;
+ }
+ return -1 ;
+ }
+ else return 1 ;
+ }
+ return 0 ;
+}
+
+static void defaultre (regex_t *re, int flag)
+{
+ char const *s = flag ? ".*" : "^$" ;
+ int r = regcomp(re, s, REG_EXTENDED | REG_NOSUB) ;
+ if (r)
+ {
+ char buf[256] ;
+ regerror(r, re, buf, 256) ;
+ strerr_diefu4x(100, "compile ", s, " into a regular expression: ", buf) ;
+ }
+}
+
+static inline int parse_env (char const *const *envp, regex_t *idstr_re, unsigned int *flags)
+{
+ unsigned int fl = 0 ;
+ int idstr_done = 0 ;
+ for (; *envp ; envp++)
+ {
+ if (str_start(*envp, "SKABUS_PUB_SENDFDS=")) fl |= 1 ;
+ if (!idstr_done)
+ {
+ idstr_done = makere(idstr_re, *envp, "SKABUS_PUB_ID_REGEX") ;
+ if (idstr_done < 0) return 0 ;
+ }
+ if (idstr_done) return 1 ;
+ }
+ if (!idstr_done) defaultre(idstr_re, flagidstrpub) ;
+ *flags = fl ;
+ return 1 ;
+}
+
+static inline int new_connection (int fd, regex_t *idstr_re, unsigned int *flags)
+{
+ s6_accessrules_params_t params = S6_ACCESSRULES_PARAMS_ZERO ;
+ s6_accessrules_result_t result = S6_ACCESSRULES_ERROR ;
+ uid_t uid ;
+ gid_t gid ;
+
+ if (getpeereid(fd, &uid, &gid) < 0)
+ {
+ if (verbosity) strerr_warnwu1sys("getpeereid") ;
+ return 0 ;
+ }
+ switch (rulestype)
+ {
+ case 0 :
+ result = S6_ACCESSRULES_ALLOW ; break ;
+ case 1 :
+ result = s6_accessrules_uidgid_fs(uid, gid, rules, &params) ; break ;
+ case 2 :
+ result = s6_accessrules_uidgid_cdb(uid, gid, &cdbmap, &params) ; break ;
+ default : break ;
+ }
+ if (result != S6_ACCESSRULES_ALLOW)
+ {
+ if (verbosity && (result == S6_ACCESSRULES_ERROR))
+ strerr_warnw1sys("error while checking rules") ;
+ return 0 ;
+ }
+ if (params.exec.s)
+ {
+ stralloc_free(&params.exec) ;
+ if (verbosity)
+ {
+ char fmtuid[UID_FMT] ;
+ char fmtgid[GID_FMT] ;
+ fmtuid[uid_fmt(fmtuid, uid)] = 0 ;
+ fmtgid[gid_fmt(fmtgid, gid)] = 0 ;
+ strerr_warnw4x("unused exec string in rules for uid ", fmtuid, " gid ", fmtgid) ;
+ }
+ }
+ if (params.env.s)
+ {
+ size_t n = byte_count(params.env.s, params.env.len, '\0') ;
+ char const *envp[n+1] ;
+ if (!env_make(envp, n, params.env.s, params.env.len))
+ {
+ if (verbosity) strerr_warnwu1sys("env_make") ;
+ stralloc_free(&params.env) ;
+ return 0 ;
+ }
+ envp[n] = 0 ;
+ if (!parse_env(envp, idstr_re, flags))
+ {
+ if (verbosity) strerr_warnwu1sys("parse_env") ;
+ s6_accessrules_params_free(&params) ;
+ return 0 ;
+ }
+ s6_accessrules_params_free(&params) ;
+ }
+ return 1 ;
+}
+
+int main (int argc, char const *const *argv, char const *const *envp)
+{
+ int spfd ;
+ int flag1 = 0 ;
+ char const *announce_re = "^$" ;
+ unsigned int maxconn = 40 ;
+ PROG = "skabus-pubd" ;
+
+ {
+ subgetopt_t l = SUBGETOPT_ZERO ;
+ unsigned int t = 0, T = 0 ;
+ for (;;)
+ {
+ int opt = subgetopt_r(argc, argv, "v:Ss1c:k:i:x:t:T:", &l) ;
+ if (opt == -1) break ;
+ switch (opt)
+ {
+ case 'v' : if (!uint0_scan(l.arg, &verbosity)) dieusage() ; break ;
+ case 'S' : flagidstrpub = 0 ; break ;
+ case 's' : flagidstrpub = 1 ; break ;
+ case '1' : flag1 = 1 ; break ;
+ case 'k' : announce_re = l.arg ; break ;
+ case 'i' : rules = l.arg ; rulestype = 1 ; break ;
+ case 'x' : rules = l.arg ; rulestype = 2 ; break ;
+ case 't' : if (!uint0_scan(l.arg, &t)) dieusage() ; break ;
+ case 'T' : if (!uint0_scan(l.arg, &T)) dieusage() ; break ;
+ case 'c' : if (!uint0_scan(l.arg, &maxconn)) dieusage() ; break ;
+ default : dieusage() ;
+ }
+ }
+ argc -= l.ind ; argv += l.ind ;
+ if (t) tain_from_millisecs(&answertto, t) ;
+ if (T) tain_from_millisecs(&lameduckdeadline, T) ;
+ }
+ if (!argc) dieusage() ;
+ if (maxconn > SKABUS_PUB_MAX) maxconn = SKABUS_PUB_MAX ;
+ if (!maxconn) maxconn = 1 ;
+ {
+ struct stat st ;
+ if (fstat(0, &st) < 0) strerr_diefu1sys(111, "fstat stdin") ;
+ if (!S_ISSOCK(st.st_mode)) strerr_dief1x(100, "stdin is not a socket") ;
+ }
+ if (flag1)
+ {
+ if (fcntl(1, F_GETFD) < 0)
+ strerr_dief1sys(100, "called with option -1 but stdout said") ;
+ }
+ else fd_close(1) ;
+ {
+ struct stat st ;
+ if (stat(argv[0], &st) < 0)
+ strerr_diefu2sys(111, "stat ", argv[0]) ;
+ if (!S_ISDIR(st.st_mode))
+ {
+ errno = ENOTDIR ;
+ strerr_diefu2sys(100, "work in ", argv[0]) ;
+ }
+ }
+ msgfsdir = argv[0] ;
+ spfd = selfpipe_init() ;
+ if (spfd < 0) strerr_diefu1sys(111, "selfpipe_init") ;
+ if (sig_ignore(SIGPIPE) < 0) strerr_diefu1sys(111, "ignore SIGPIPE") ;
+ {
+ sigset_t set ;
+ sigemptyset(&set) ;
+ sigaddset(&set, SIGTERM) ;
+ sigaddset(&set, SIGHUP) ;
+ if (selfpipe_trapset(&set) < 0) strerr_diefu1sys(111, "trap signals") ;
+ }
+
+ if (rulestype == 2)
+ {
+ cdbfd = open_readb(rules) ;
+ if (cdbfd < 0) strerr_diefu3sys(111, "open ", rules, " for reading") ;
+ if (cdb_init(&cdbmap, cdbfd) < 0)
+ strerr_diefu2sys(111, "cdb_init ", rules) ;
+ }
+
+ { /* I present to you: the stack */
+ GENSETB_TYPE(client_t, 1+maxconn) blob ;
+ iopause_fd x[2 + (maxconn << 1)] ;
+ AVLTREEN_DECLARE_AND_INIT(blobmap, 1+maxconn, &idstr_dtok, &idstr_cmp, &blob) ;
+
+ GENSETB_init(client_t, &blob, 1+maxconn) ;
+ clients = &blob.info ;
+ sentinel = gensetb_new(&blob) ;
+ blob.storage[sentinel].next = sentinel ;
+ blob.storage[sentinel].idstr[0] = 0 ;
+ {
+ int r = regcomp(&blob.storage[sentinel].subscribe_re, announce_re, REG_EXTENDED | REG_NOSUB) ;
+ if (r)
+ {
+ char buf[256] ;
+ regerror(r, &blob.storage[sentinel].subscribe_re, buf, 256) ;
+ strerr_dief4x(100, "invalid control regex: ", announce_re, ": ", buf) ;
+ }
+ }
+ if (regcomp(&blob.storage[sentinel].write_re, "^$", REG_NOSUB)) strerr_diefu1x(100, "regcomp ^$") ;
+ avltree_init(&blob.storage[sentinel].subscribers, 3, 3, 8, &uint32_dtok, &ptr_cmp, 0) ;
+ avltreen_insert(&blobmap, sentinel) ;
+ clientmap = &blobmap ;
+ x[0].fd = spfd ; x[0].events = IOPAUSE_READ ;
+ x[1].fd = 0 ;
+
+ if (flag1)
+ {
+ fd_write(1, "\n", 1) ;
+ fd_close(1) ;
+ }
+ tain_now_g() ;
+
+ for (;;)
+ {
+ tain_t deadline ;
+ uint32_t i = blob.storage[sentinel].next, j = 2 ;
+ int r = 1 ;
+ if (cont) tain_add_g(&deadline, &tain_infinite_relative) ;
+ else deadline = lameduckdeadline ;
+ x[1].events = (cont && (numconn < maxconn)) ? IOPAUSE_READ : 0 ;
+ for (; i != sentinel ; i = blob.storage[i].next)
+ if (client_prepare_iopause(i, &deadline, x, &j)) r = 0 ;
+ if (!cont && r) break ;
+
+ r = iopause_g(x, j, &deadline) ;
+ if (r < 0) strerr_diefu1sys(111, "iopause") ;
+
+ if (!r)
+ {
+ for (j = sentinel, i = blob.storage[sentinel].next ; i != sentinel ; j = i, i = blob.storage[i].next)
+ if (!tain_future(&blob.storage[i].deadline))
+ {
+ char what[2] = "-" ;
+ what[1] = ETIMEDOUT ;
+ if (!announce(what, 2, CLIENT(i)->idstr)) dienomem() ;
+ remove(&i, j) ;
+ }
+ continue ;
+ }
+
+ if (x[0].revents & IOPAUSE_READ) handle_signals() ;
+
+ for (j = sentinel, i = blob.storage[sentinel].next ; i != sentinel ; j = i, i = blob.storage[i].next)
+ if (!client_flush(i, x)) remove(&i, j) ;
+
+ for (j = sentinel, i = blob.storage[sentinel].next ; i != sentinel ; j = i, i = blob.storage[i].next)
+ switch (client_read(i, x))
+ {
+ case 0 : errno = 0 ;
+ case -1 :
+ case -2 :
+ {
+ char what[2] = "-" ;
+ what[1] = errno ;
+ if (!announce(what, 2, CLIENT(i)->idstr)) dienomem() ;
+ }
+ remove(&i, j) ;
+ case 1 : break ;
+ default : errno = EILSEQ ; die() ;
+ }
+
+ if (x[1].revents & IOPAUSE_READ)
+ {
+ regex_t idstr_re ;
+ unsigned int flags = 0 ;
+ int fd = ipc_accept_nb(x[1].fd, 0, 0, 0) ;
+ if (fd < 0)
+ if (!error_isagain(errno)) strerr_diefu1sys(111, "accept") ;
+ else continue ;
+ else if (!new_connection(fd, &idstr_re, &flags)) fd_close(fd) ;
+ else client_add(fd, &idstr_re, flags) ;
+ }
+ }
+ }
+ return 0 ;
+}