diff options
Diffstat (limited to 'src')
29 files changed, 1815 insertions, 1 deletions
diff --git a/src/include/skabus/pub.h b/src/include/skabus/pub.h new file mode 100644 index 0000000..68168d0 --- /dev/null +++ b/src/include/skabus/pub.h @@ -0,0 +1,168 @@ +/* ISC license. */ + +#ifndef SKABUS_PUB_H +#define SKABUS_PUB_H + +#include <stdint.h> +#include <sys/uio.h> + +#include <skalibs/uint64.h> +#include <skalibs/diuint32.h> +#include <skalibs/tai.h> +#include <skalibs/stralloc.h> +#include <skalibs/genalloc.h> +#include <skalibs/unixmessage.h> +#include <skalibs/skaclient.h> + + +/* Misc constants */ + +#define SKABUS_PUB_MAX 4000 +#define SKABUS_PUB_BANNER1 "skabus_pub v1.0 (b)\n" +#define SKABUS_PUB_BANNER1_LEN (sizeof SKABUS_PUB_BANNER1 - 1) +#define SKABUS_PUB_BANNER2 "skabus_pub v1.0 (a)\n" +#define SKABUS_PUB_BANNER2_LEN (sizeof SKABUS_PUB_BANNER2 - 1) +#define SKABUS_PUB_IDSTR_SIZE 254 + + + /* skabus_pub auxiliary data */ + +typedef struct skabus_pub_msginfo_s skabus_pub_msginfo_t, *skabus_pub_msginfo_t_ref ; +struct skabus_pub_msginfo_s +{ + uint64_t serial ; + tain_t timestamp ; + uint8_t flags ; + char sender[SKABUS_PUB_IDSTR_SIZE + 1] ; +} ; +#define SKABUS_PUB_MSGINFO_ZERO { .serial = 0, .timestamp = TAIN_ZERO, .flags = 0, .sender = "" } + + + /* internal client message storage */ + +typedef struct skabus_pub_cltinfo_s skabus_pub_cltinfo_t, *skabus_pub_cltinfo_t_ref ; +struct skabus_pub_cltinfo_s +{ + skabus_pub_msginfo_t msginfo ; + int fd ; + size_t nfds ; + int *fds ; +} ; +#define SKABUS_PUB_CLTINFO_ZERO { .msginfo = SKABUS_PUB_MSGINFO_ZERO, .fd = -1, .nfds = 0, .fds = 0 } + + + /* skabus_pub client connection */ + +typedef struct skabus_pub_s skabus_pub_t, *skabus_pub_t_ref ; +struct skabus_pub_s +{ + skaclient_t connection ; + genalloc info ; /* array of skabus_pub_cltinfo_t */ + size_t head ; + skaclient_buffer_t buffers ; +} ; +#define SKABUS_PUB_ZERO { .connection = SKACLIENT_ZERO, .info = GENALLOC_ZERO, .head = 0 } + + + /* Starting and ending a session */ + +typedef struct skabus_pub_start_result_s skabus_pub_start_result_t, *skabus_pub_start_result_t_ref ; +struct skabus_pub_start_result_s +{ + skaclient_cbdata_t skaclient_cbdata ; +} ; + +#define skabus_pub_init(a, path, id, sre, wre, deadline, stamp) (skabus_pub_start(a, path, deadline, stamp) && skabus_pub_register(a, id, sre, wre, deadline, stamp)) +#define skabus_pub_init_g(a, path, id, sre, wre, deadline) skabus_pub_init(a, path, id, sre, wre, (deadline), &STAMP) + +extern int skabus_pub_start_async (skabus_pub_t *, char const *, skabus_pub_start_result_t *) ; +extern int skabus_pub_start (skabus_pub_t *, char const *, tain_t const *, tain_t *) ; +#define skabus_pub_start_g(a, path, deadline) skabus_pub_start(a, path, (deadline), &STAMP) + +extern int skabus_pub_register_async (skabus_pub_t *, char const *, char const *, char const *, unsigned char *) ; +extern int skabus_pub_register (skabus_pub_t *, char const *, char const *, char const *, tain_t const *, tain_t *) ; +#define skabus_pub_register_g(a, id, sre, wre, deadline) skabus_pub_register(a, id, sre, wre, (deadline), &STAMP) + +extern void skabus_pub_end (skabus_pub_t *) ; + + + /* Reading messages */ + +#define skabus_pub_fd(a) skaclient_fd(&(a)->connection) +extern int skabus_pub_update (skabus_pub_t *) ; +extern int skabus_pub_message_getnfds (skabus_pub_t const *) ; +extern size_t skabus_pub_message_get (skabus_pub_t *, skabus_pub_msginfo_t *, int *, int *) ; + + + /* Sending public messages */ + +typedef struct skabus_pub_send_result_s skabus_pub_send_result_t, *skabus_pub_send_result_t_ref ; +struct skabus_pub_send_result_s +{ + uint64_t u ; + unsigned char err ; +} ; + +extern int skabus_pub_send_withfds_async (skabus_pub_t *, char const *, size_t, int const *, unsigned int, unsigned char const *, skabus_pub_send_result_t *) ; +#define skabus_pub_send_async(a, s, len, res) skabus_pub_send_withfds_async(a, s, len, 0, 0, unixmessage_bits_closenone, res) + +extern uint64_t skabus_pub_send_withfds (skabus_pub_t *, char const *, size_t, int const *, unsigned int, unsigned char const *, tain_t const *, tain_t *) ; +#define skabus_pub_send_withfds_g(a, s, len, fds, nfds, bits, deadline) skabus_pub_send_withfds(a, s, len, fds, nfds, bits, (deadline), &STAMP) +#define skabus_pub_send(a, s, len, deadline, stamp) skabus_pub_send_withfds(a, s, len, 0, 0, unixmessage_bits_closenone, deadline, stamp) +#define skabus_pub_send_g(a, s, len, deadline) skabus_pub_send(a, s, len, (deadline), &STAMP) + +extern int skabus_pub_sendv_withfds_async (skabus_pub_t *, struct iovec const *, unsigned int, int const *, unsigned int, unsigned char const *, skabus_pub_send_result_t *) ; +#define skabus_pub_sendv_async(a, v, vlen, res) skabus_pub_sendv_withfds_async(a, v, vlen, 0, 0, unixmessage_bits_closenone, res) + +extern uint64_t skabus_pub_sendv_withfds (skabus_pub_t *, struct iovec const *, unsigned int, int const *, unsigned int, unsigned char const *, tain_t const *, tain_t *) ; +#define skabus_pub_sendv_withfds_g(a, v, vlen, fds, nfds, bits, deadline) skabus_pub_sendv_withfds(a, v, vlen, fds, nfds, bits, (deadline), &STAMP) +#define skabus_pub_sendv(a, v, vlen, deadline, stamp) skabus_pub_sendv_withfds(a, v, vlen, 0, 0, unixmessage_bits_closenone, deadline, stamp) +#define skabus_pub_sendv_g(a, v, vlen, deadline) skabus_pub_sendv(a, v, vlen, (deadline), &STAMP) + + + /* Sending private messages */ + +extern int skabus_pub_sendpm_withfds_async (skabus_pub_t *, char const *, char const *, size_t, int const *, unsigned int, unsigned char const *, skabus_pub_send_result_t *) ; +#define skabus_pub_sendpm_async(a, id, s, len, res) skabus_pub_sendpm_withfds_async(a, id, s, len, 0, 0, unixmessage_bits_closenone, res) + +extern uint64_t skabus_pub_sendpm_withfds (skabus_pub_t *, char const *, char const *, size_t, int const *, unsigned int, unsigned char const *, tain_t const *, tain_t *) ; +#define skabus_pub_sendpm_withfds_g(a, id, s, len, fds, nfds, bits, deadline) skabus_pub_sendpm_withfds(a, id, s, len, fds, nfds, bits, (deadline), &STAMP) +#define skabus_pub_sendpm(a, id, s, len, deadline, stamp) skabus_pub_sendpm_withfds(a, id, s, len, 0, 0, unixmessage_bits_closenone, deadline, stamp) +#define skabus_pub_sendpm_g(a, id, s, len, deadline) skabus_pub_sendpm(a, id, s, len, (deadline), &STAMP) + +extern int skabus_pub_sendvpm_withfds_async (skabus_pub_t *, char const *, struct iovec const *, unsigned int, int const *, unsigned int, unsigned char const *, skabus_pub_send_result_t *) ; +#define skabus_pub_sendvpm_async(a, id, v, vlen, res) skabus_pub_sendvpm_withfds_async(a, id, v, vlen, 0, 0, unixmessage_bits_closenone, res) + +extern uint64_t skabus_pub_sendvpm_withfds (skabus_pub_t *, char const *, struct iovec const *, unsigned int, int const *, unsigned int, unsigned char const *, tain_t const *, tain_t *) ; +#define skabus_pub_sendvpm_withfds_g(a, id, v, vlen, fds, nfds, bits, deadline) skabus_pub_sendvpm_withfds(a, id, v, vlen, fds, nfds, bits, (deadline), &STAMP) +#define skabus_pub_sendvpm(a, id, v, vlen, deadline, stamp) skabus_pub_sendvpm_withfds(a, id, v, vlen, 0, 0, unixmessage_bits_closenone, deadline, stamp) +#define skabus_pub_sendvpm_g(a, id, v, vlen, deadline) skabus_pub_sendvpm(a, id, v, vlen, (deadline), &STAMP) + + + /* Subscribing to a sender */ + +extern int skabus_pub_subunsub_async (skabus_pub_t *, char, char const *, unsigned char *) ; +#define skabus_pub_subscribe_async(a, id, err) skabus_pub_subunsub_async(a, 'S', id, err) +#define skabus_pub_unsubscribe_async(a, id, err) skabus_pub_subunsub_async(a, 'U', id, err) +extern int skabus_pub_subunsub (skabus_pub_t *, char, char const *, tain_t const *, tain_t *) ; +#define skabus_pub_subscribe(a, id, deadline, stamp) skabus_pub_subunsub(a, 'S', id, deadline, stamp) +#define skabus_pub_subscribe_g(a, id, deadline) skabus_pub_subscribe(a, id, (deadline), &STAMP) +#define skabus_pub_unsubscribe(a, id, deadline, stamp) skabus_pub_subunsub(a, 'U', id, deadline, stamp) +#define skabus_pub_unsubscribe_g(a, id, deadline) skabus_pub_unsubscribe(a, id, (deadline), &STAMP) + + + /* Listing all clients */ + +typedef struct skabus_pub_list_result_s skabus_pub_list_result_t, *skabus_pub_list_result_t_ref ; +struct skabus_pub_list_result_s +{ + stralloc *sa ; + diuint32 n ; + unsigned char err ; +} ; + +extern int skabus_pub_list_async (skabus_pub_t *, stralloc *, skabus_pub_list_result_t *) ; +extern int skabus_pub_list (skabus_pub_t *, stralloc *, diuint32 *, tain_t const *, tain_t *) ; +#define skabus_pub_list_g(a, sa, deadline) skabus_pub_list(a, sa, (deadline), &STAMP) + +#endif diff --git a/src/include/skabus/skabus.h b/src/include/skabus/skabus.h index 1c07537..d0e8891 100644 --- a/src/include/skabus/skabus.h +++ b/src/include/skabus/skabus.h @@ -7,6 +7,7 @@ extern "C" { #endif +#include <skabus/pub.h> #include <skabus/rpc.h> #ifdef __cplusplus diff --git a/src/libskabus/deps-lib/skabus b/src/libskabus/deps-lib/skabus index 21582c7..a7a8369 100644 --- a/src/libskabus/deps-lib/skabus +++ b/src/libskabus/deps-lib/skabus @@ -1,3 +1,24 @@ +skabus_pub_end.o +skabus_pub_list.o +skabus_pub_list_async.o +skabus_pub_message_get.o +skabus_pub_message_getnfds.o +skabus_pub_register.o +skabus_pub_register_async.o +skabus_pub_send.o +skabus_pub_send_async.o +skabus_pub_send_cb.o +skabus_pub_sendpm.o +skabus_pub_sendpm_async.o +skabus_pub_sendv.o +skabus_pub_sendv_async.o +skabus_pub_sendvpm.o +skabus_pub_sendvpm_async.o +skabus_pub_start.o +skabus_pub_start_async.o +skabus_pub_subunsub.o +skabus_pub_subunsub_async.o +skabus_pub_update.o skabus_rpc_cancel.o skabus_rpc_cancel_async.o skabus_rpc_end.o @@ -17,8 +38,8 @@ skabus_rpc_r_notimpl.o skabus_rpc_rcancel_ignore.o skabus_rpc_release.o skabus_rpc_reply.o -skabus_rpc_replyv.o skabus_rpc_reply_async.o +skabus_rpc_replyv.o skabus_rpc_replyv_async.o skabus_rpc_rinfo_pack.o skabus_rpc_rinfo_unpack.o diff --git a/src/libskabus/skabus-pub-internal.h b/src/libskabus/skabus-pub-internal.h new file mode 100644 index 0000000..efe8bb9 --- /dev/null +++ b/src/libskabus/skabus-pub-internal.h @@ -0,0 +1,12 @@ +/* ISC license. */ + +#ifndef SKABUS_PUB_INTERNAL_H +#define SKABUS_PUB_INTERNAL_H + +#include <skalibs/unixmessage.h> + +#define SKABUS_HEAD_MAX 64 + +extern unixmessage_handler_func_t skabus_pub_send_cb ; + +#endif diff --git a/src/libskabus/skabus_pub_end.c b/src/libskabus/skabus_pub_end.c new file mode 100644 index 0000000..579e83e --- /dev/null +++ b/src/libskabus/skabus_pub_end.c @@ -0,0 +1,25 @@ +/* ISC license. */ + +#include <skalibs/alloc.h> +#include <skalibs/djbunix.h> +#include <skalibs/genalloc.h> +#include <skalibs/skaclient.h> + +#include <skabus/pub.h> + +static void skabus_pub_cltinfo_free (skabus_pub_cltinfo_t *p) +{ + fd_close(p->fd) ; + if (p->nfds) + { + for (size_t i = 0 ; i < p->nfds ; i++) fd_close(p->fds[i]) ; + alloc_free(p->fds) ; + } +} + +void skabus_pub_end (skabus_pub_t *a) +{ + skaclient_end(&a->connection) ; + genalloc_deepfree(skabus_pub_cltinfo_t, &a->info, &skabus_pub_cltinfo_free) ; + a->head = 0 ; +} diff --git a/src/libskabus/skabus_pub_list.c b/src/libskabus/skabus_pub_list.c new file mode 100644 index 0000000..4b8ceeb --- /dev/null +++ b/src/libskabus/skabus_pub_list.c @@ -0,0 +1,17 @@ +/* ISC license. */ + +#include <errno.h> + +#include <skalibs/skaclient.h> + +#include <skabus/pub.h> + +int skabus_pub_list (skabus_pub_t *a, stralloc *sa, diuint32 *n, tain_t const *deadline, tain_t *stamp) +{ + skabus_pub_list_result_t r ; + if (!skabus_pub_list_async(a, sa, &r)) return 0 ; + if (!skaclient_syncify(&a->connection, deadline, stamp)) return 0 ; + if (r.err) return (errno = r.err, 0) ; + *n = r.n ; + return 1 ; +} diff --git a/src/libskabus/skabus_pub_list_async.c b/src/libskabus/skabus_pub_list_async.c new file mode 100644 index 0000000..48797b5 --- /dev/null +++ b/src/libskabus/skabus_pub_list_async.c @@ -0,0 +1,45 @@ +/* ISC license. */ + +#include <stdint.h> +#include <errno.h> + +#include <skalibs/posixishard.h> +#include <skalibs/uint32.h> +#include <skalibs/stralloc.h> +#include <skalibs/unixmessage.h> +#include <skalibs/skaclient.h> + +#include <skabus/pub.h> + +static int skabus_pub_list_cb (unixmessage_t const *m, void *p) +{ + skabus_pub_list_result_t *r = p ; + uint32_t n ; + size_t w = 9, len = m->len - 9 ; + if (m->len < 9 || m->nfds) goto errproto ; + r->err = m->s[0] ; + if (r->err) return 1 ; + uint32_unpack_big(m->s + 1, &n) ; + if (n > 0x7fffffffu || m->len < 9 + (n<<1)) goto errproto ; + if (!stralloc_readyplus(r->sa, m->len - 9 - n)) goto errproto ; + r->n.left = n ; + uint32_unpack_big(m->s + 5, &n) ; r->n.right = n ; + for (uint32_t i = 0 ; i < r->n.left ; i++) + { + size_t thislen ; + if (len < 2) goto errproto ; + thislen = m->s[w++] + 1 ; len-- ; + if ((thislen > len) || m->s[w + thislen]) goto errproto ; + stralloc_catb(r->sa, m->s + w, thislen) ; + w += thislen ; len -= thislen ; + } + return 1 ; + errproto: + return (errno = EPROTO, 0) ; +} + +int skabus_pub_list_async (skabus_pub_t *a, stralloc *sa, skabus_pub_list_result_t *result) +{ + result->sa = sa ; + return skaclient_put(&a->connection, "L", 1, &skabus_pub_list_cb, result) ; +} diff --git a/src/libskabus/skabus_pub_message_get.c b/src/libskabus/skabus_pub_message_get.c new file mode 100644 index 0000000..aabefaa --- /dev/null +++ b/src/libskabus/skabus_pub_message_get.c @@ -0,0 +1,32 @@ +/* ISC license. */ + +#include <string.h> + +#include <skalibs/alloc.h> +#include <skalibs/genalloc.h> + +#include <skabus/pub.h> +#include "skabus-pub-internal.h" + +size_t skabus_pub_message_get (skabus_pub_t *a, skabus_pub_msginfo_t *info, int *fd, int *fds) +{ + size_t n = genalloc_len(skabus_pub_cltinfo_t, &a->info) ; + skabus_pub_cltinfo_t *p = genalloc_s(skabus_pub_cltinfo_t, &a->info) + a->head ; + if (!n) return 0 ; + *info = p->msginfo ; + *fd = p->fd ; + if (p->nfds) + { + for (size_t i = 0 ; i < p->nfds ; i++) fds[i] = p->fds[i] ; + alloc_free(p->fds) ; + } + + if (++a->head == n || a->head > SKABUS_HEAD_MAX) + { + n -= a->head ; + memmove(a->info.s, a->info.s + a->head * sizeof(skabus_pub_cltinfo_t), n * sizeof(skabus_pub_cltinfo_t)) ; + genalloc_setlen(skabus_pub_cltinfo_t, &a->info, n) ; + a->head = 0 ; + } + return 1 + n - a->head ; +} diff --git a/src/libskabus/skabus_pub_message_getnfds.c b/src/libskabus/skabus_pub_message_getnfds.c new file mode 100644 index 0000000..828a42d --- /dev/null +++ b/src/libskabus/skabus_pub_message_getnfds.c @@ -0,0 +1,11 @@ +/* ISC license. */ + +#include <skalibs/genalloc.h> + +#include <skabus/pub.h> + +int skabus_pub_message_getnfds (skabus_pub_t const *a) +{ + return genalloc_len(skabus_pub_cltinfo_t, &a->info) <= a->head ? -1 : + (int)genalloc_s(skabus_pub_cltinfo_t, &a->info)[a->head].nfds ; +} diff --git a/src/libskabus/skabus_pub_register.c b/src/libskabus/skabus_pub_register.c new file mode 100644 index 0000000..30a08a3 --- /dev/null +++ b/src/libskabus/skabus_pub_register.c @@ -0,0 +1,15 @@ +/* ISC license. */ + +#include <errno.h> + +#include <skalibs/skaclient.h> + +#include <skabus/pub.h> + +int skabus_pub_register (skabus_pub_t *a, char const *id, char const *sre, char const *wre, tain_t const *deadline, tain_t *stamp) +{ + unsigned char r ; + if (!skabus_pub_register_async(a, id, sre, wre, &r)) return 0 ; + if (!skaclient_syncify(&a->connection, deadline, stamp)) return 0 ; + return r ? (errno = r, 0) : 1 ; +} diff --git a/src/libskabus/skabus_pub_register_async.c b/src/libskabus/skabus_pub_register_async.c new file mode 100644 index 0000000..2736bcc --- /dev/null +++ b/src/libskabus/skabus_pub_register_async.c @@ -0,0 +1,30 @@ +/* ISC license. */ + +#include <string.h> +#include <errno.h> +#include <sys/uio.h> + +#include <skalibs/uint32.h> +#include <skalibs/skaclient.h> + +#include <skabus/pub.h> + +int skabus_pub_register_async (skabus_pub_t *a, char const *id, char const *sre, char const *wre, unsigned char *err) +{ + size_t idlen = strlen(id) ; + size_t srelen = strlen(sre) ; + size_t wrelen = strlen(wre) ; + char pack[10] = "R" ; + struct iovec v[4] = + { + { .iov_base = pack, .iov_len = 10 }, + { .iov_base = (char *)id, .iov_len = idlen+1 }, + { .iov_base = (char *)sre, .iov_len = srelen+1 }, + { .iov_base = (char *)wre, .iov_len = wrelen+1 } + } ; + if (idlen > SKABUS_PUB_IDSTR_SIZE) return (errno = ERANGE, 0) ; + pack[1] = (unsigned char)idlen ; + uint32_pack_big(pack+2, srelen) ; + uint32_pack_big(pack+6, wrelen) ; + return skaclient_putv(&a->connection, v, 4, &skaclient_default_cb, err) ; +} diff --git a/src/libskabus/skabus_pub_send.c b/src/libskabus/skabus_pub_send.c new file mode 100644 index 0000000..f41ede3 --- /dev/null +++ b/src/libskabus/skabus_pub_send.c @@ -0,0 +1,15 @@ +/* ISC license. */ + +#include <errno.h> + +#include <skalibs/skaclient.h> + +#include <skabus/pub.h> + +uint64_t skabus_pub_send_withfds (skabus_pub_t *a, char const *s, size_t len, int const *fds, unsigned int nfds, unsigned char const *bits, tain_t const *deadline, tain_t *stamp) +{ + skabus_pub_send_result_t r ; + if (!skabus_pub_send_withfds_async(a, s, len, fds, nfds, bits, &r)) return 0 ; + if (!skaclient_syncify(&a->connection, deadline, stamp)) return 0 ; + return r.err ? (errno = r.err, 0) : r.u ; +} diff --git a/src/libskabus/skabus_pub_send_async.c b/src/libskabus/skabus_pub_send_async.c new file mode 100644 index 0000000..1a2bb58 --- /dev/null +++ b/src/libskabus/skabus_pub_send_async.c @@ -0,0 +1,16 @@ +/* ISC license. */ + +#include <sys/uio.h> + +#include <skalibs/unixmessage.h> +#include <skalibs/skaclient.h> + +#include <skabus/pub.h> +#include "skabus-pub-internal.h" + +int skabus_pub_send_withfds_async (skabus_pub_t *a, char const *s, size_t len, int const *fds, unsigned int nfds, unsigned char const *bits, skabus_pub_send_result_t *result) +{ + struct iovec v[2] = { { .iov_base = "!", .iov_len = 1 }, { .iov_base = (char *)s, .iov_len = len } } ; + unixmessage_v_t m = { .v = v, .vlen = 2, .fds = (int *)fds, .nfds = nfds } ; + return skaclient_putmsgv_and_close(&a->connection, &m, bits, &skabus_pub_send_cb, result) ; +} diff --git a/src/libskabus/skabus_pub_send_cb.c b/src/libskabus/skabus_pub_send_cb.c new file mode 100644 index 0000000..e95ab6c --- /dev/null +++ b/src/libskabus/skabus_pub_send_cb.c @@ -0,0 +1,25 @@ +/* ISC license. */ + +#include <errno.h> + +#include <skalibs/posixishard.h> +#include <skalibs/uint64.h> +#include <skalibs/unixmessage.h> + +#include <skabus/pub.h> + +int skabus_pub_send_cb (unixmessage_t const *m, void *p) +{ + skabus_pub_send_result_t *r = p ; + if (!m->len || m->nfds) return (errno = EPROTO, 0) ; + if (m->s[0]) + { + r->u = 0 ; + r->err = m->s[0] ; + return 1 ; + } + if (m->len != 9) return (errno = EPROTO, 0) ; + uint64_unpack_big(m->s + 1, &r->u) ; + r->err = 0 ; + return 1 ; +} diff --git a/src/libskabus/skabus_pub_sendpm.c b/src/libskabus/skabus_pub_sendpm.c new file mode 100644 index 0000000..073633a --- /dev/null +++ b/src/libskabus/skabus_pub_sendpm.c @@ -0,0 +1,15 @@ +/* ISC license. */ + +#include <errno.h> + +#include <skalibs/skaclient.h> + +#include <skabus/pub.h> + +uint64_t skabus_pub_sendpm_withfds (skabus_pub_t *a, char const *id, char const *s, size_t len, int const *fds, unsigned int nfds, unsigned char const *bits, tain_t const *deadline, tain_t *stamp) +{ + skabus_pub_send_result_t r ; + if (!skabus_pub_sendpm_withfds_async(a, id, s, len, fds, nfds, bits, &r)) return 0 ; + if (!skaclient_syncify(&a->connection, deadline, stamp)) return 0 ; + return r.err ? (errno = r.err, 0) : r.u ; +} diff --git a/src/libskabus/skabus_pub_sendpm_async.c b/src/libskabus/skabus_pub_sendpm_async.c new file mode 100644 index 0000000..d43b01f --- /dev/null +++ b/src/libskabus/skabus_pub_sendpm_async.c @@ -0,0 +1,27 @@ +/* ISC license. */ + +#include <string.h> +#include <sys/uio.h> +#include <errno.h> + +#include <skalibs/unixmessage.h> +#include <skalibs/skaclient.h> + +#include <skabus/pub.h> +#include "skabus-pub-internal.h" + +int skabus_pub_sendpm_withfds_async (skabus_pub_t *a, char const *idstr, char const *s, size_t len, int const *fds, unsigned int nfds, unsigned char const *bits, skabus_pub_send_result_t *result) +{ + size_t idlen = strlen(idstr) ; + char tmp[2] = "+" ; + struct iovec v[3] = + { + { .iov_base = tmp, .iov_len = 2 }, + { .iov_base = (char *)idstr, .iov_len = idlen+1 }, + { .iov_base = (char *)s, .iov_len = len } + } ; + unixmessage_v_t m = { .v = v, .vlen = 3, .fds = (int *)fds, .nfds = nfds } ; + if (idlen > SKABUS_PUB_IDSTR_SIZE) return (errno = ERANGE, 0) ; + tmp[1] = (unsigned char)idlen ; + return skaclient_putmsgv_and_close(&a->connection, &m, bits, &skabus_pub_send_cb, result) ; +} diff --git a/src/libskabus/skabus_pub_sendv.c b/src/libskabus/skabus_pub_sendv.c new file mode 100644 index 0000000..ff1c743 --- /dev/null +++ b/src/libskabus/skabus_pub_sendv.c @@ -0,0 +1,16 @@ +/* ISC license. */ + +#include <errno.h> + +#include <skalibs/uint64.h> +#include <skalibs/skaclient.h> + +#include <skabus/pub.h> + +uint64_t skabus_pub_sendv_withfds (skabus_pub_t *a, struct iovec const *v, unsigned int vlen, int const *fds, unsigned int nfds, unsigned char const *bits, tain_t const *deadline, tain_t *stamp) +{ + skabus_pub_send_result_t r ; + if (!skabus_pub_sendv_withfds_async(a, v, vlen, fds, nfds, bits, &r)) return 0 ; + if (!skaclient_syncify(&a->connection, deadline, stamp)) return 0 ; + return r.err ? (errno = r.err, 0) : r.u ; +} diff --git a/src/libskabus/skabus_pub_sendv_async.c b/src/libskabus/skabus_pub_sendv_async.c new file mode 100644 index 0000000..54ce94e --- /dev/null +++ b/src/libskabus/skabus_pub_sendv_async.c @@ -0,0 +1,18 @@ +/* ISC license. */ + +#include <sys/uio.h> + +#include <skalibs/unixmessage.h> +#include <skalibs/skaclient.h> + +#include <skabus/pub.h> +#include "skabus-pub-internal.h" + +int skabus_pub_sendv_withfds_async (skabus_pub_t *a, struct iovec const *v, unsigned int vlen, int const *fds, unsigned int nfds, unsigned char const *bits, skabus_pub_send_result_t *result) +{ + struct iovec vv[vlen+1] ; + unixmessage_v_t m = { .v = vv, .vlen = vlen+1, .fds = (int *)fds, .nfds = nfds } ; + vv[0].iov_base = "!" ; vv[0].iov_len = 1 ; + for (unsigned int i = 0 ; i < vlen ; i++) vv[1+i] = v[i] ; + return skaclient_putmsgv_and_close(&a->connection, &m, bits, &skabus_pub_send_cb, result) ; +} diff --git a/src/libskabus/skabus_pub_sendvpm.c b/src/libskabus/skabus_pub_sendvpm.c new file mode 100644 index 0000000..a898bbc --- /dev/null +++ b/src/libskabus/skabus_pub_sendvpm.c @@ -0,0 +1,15 @@ +/* ISC license. */ + +#include <errno.h> + +#include <skalibs/skaclient.h> + +#include <skabus/pub.h> + +uint64_t skabus_pub_sendvpm_withfds (skabus_pub_t *a, char const *idstr, struct iovec const *v, unsigned int vlen, int const *fds, unsigned int nfds, unsigned char const *bits, tain_t const *deadline, tain_t *stamp) +{ + skabus_pub_send_result_t r ; + if (!skabus_pub_sendvpm_withfds_async(a, idstr, v, vlen, fds, nfds, bits, &r)) return 0 ; + if (!skaclient_syncify(&a->connection, deadline, stamp)) return 0 ; + return r.err ? (errno = r.err, 0) : r.u ; +} diff --git a/src/libskabus/skabus_pub_sendvpm_async.c b/src/libskabus/skabus_pub_sendvpm_async.c new file mode 100644 index 0000000..b1bab4d --- /dev/null +++ b/src/libskabus/skabus_pub_sendvpm_async.c @@ -0,0 +1,25 @@ +/* ISC license. */ + +#include <string.h> +#include <sys/uio.h> +#include <errno.h> + +#include <skalibs/unixmessage.h> +#include <skalibs/skaclient.h> + +#include <skabus/pub.h> +#include "skabus-pub-internal.h" + +int skabus_pub_sendvpm_withfds_async (skabus_pub_t *a, char const *idstr, struct iovec const *v, unsigned int vlen, int const *fds, unsigned int nfds, unsigned char const *bits, skabus_pub_send_result_t *result) +{ + size_t idlen = strlen(idstr) ; + char tmp[2] = "+" ; + struct iovec vv[vlen + 2] ; + unixmessage_v_t m = { .v = vv, .vlen = vlen + 2, .fds = (int *)fds, .nfds = nfds } ; + if (idlen > SKABUS_PUB_IDSTR_SIZE) return (errno = ERANGE, 0) ; + tmp[1] = (unsigned char)idlen ; + vv[0].iov_base = tmp ; vv[0].iov_len = 2 ; + vv[1].iov_base = (char *)idstr ; vv[1].iov_len = idlen+1 ; + for (unsigned int i = 0 ; i < vlen ; i++) vv[2+i] = v[i] ; + return skaclient_putmsgv_and_close(&a->connection, &m, bits, &skabus_pub_send_cb, result) ; +} diff --git a/src/libskabus/skabus_pub_start.c b/src/libskabus/skabus_pub_start.c new file mode 100644 index 0000000..d72b0f7 --- /dev/null +++ b/src/libskabus/skabus_pub_start.c @@ -0,0 +1,20 @@ +/* ISC license. */ + +#include <skalibs/skaclient.h> + +#include <skabus/pub.h> + +int skabus_pub_start (skabus_pub_t *a, char const *path, tain_t const *deadline, tain_t *stamp) +{ + return skaclient_start_b( + &a->connection, + &a->buffers, + path, + SKACLIENT_OPTION_ASYNC_ACCEPT_FDS, + SKABUS_PUB_BANNER1, + SKABUS_PUB_BANNER1_LEN, + SKABUS_PUB_BANNER2, + SKABUS_PUB_BANNER2_LEN, + deadline, + stamp) ; +} diff --git a/src/libskabus/skabus_pub_start_async.c b/src/libskabus/skabus_pub_start_async.c new file mode 100644 index 0000000..3a82f41 --- /dev/null +++ b/src/libskabus/skabus_pub_start_async.c @@ -0,0 +1,19 @@ +/* ISC license. */ + +#include <skalibs/skaclient.h> + +#include <skabus/pub.h> + +int skabus_pub_start_async (skabus_pub_t *a, char const *path, skabus_pub_start_result_t *data) +{ + return skaclient_start_async_b( + &a->connection, + &a->buffers, + path, + SKACLIENT_OPTION_ASYNC_ACCEPT_FDS, + SKABUS_PUB_BANNER1, + SKABUS_PUB_BANNER1_LEN, + SKABUS_PUB_BANNER2, + SKABUS_PUB_BANNER2_LEN, + &data->skaclient_cbdata) ; +} diff --git a/src/libskabus/skabus_pub_subunsub.c b/src/libskabus/skabus_pub_subunsub.c new file mode 100644 index 0000000..29e61f1 --- /dev/null +++ b/src/libskabus/skabus_pub_subunsub.c @@ -0,0 +1,15 @@ +/* ISC license. */ + +#include <errno.h> + +#include <skalibs/skaclient.h> + +#include <skabus/pub.h> + +int skabus_pub_subunsub (skabus_pub_t *a, char what, char const *id, tain_t const *deadline, tain_t *stamp) +{ + unsigned char r ; + if (!skabus_pub_subunsub_async(a, what, id, &r)) return 0 ; + if (!skaclient_syncify(&a->connection, deadline, stamp)) return 0 ; + return r ? (errno = r, 0) : 1 ; +} diff --git a/src/libskabus/skabus_pub_subunsub_async.c b/src/libskabus/skabus_pub_subunsub_async.c new file mode 100644 index 0000000..d41ed98 --- /dev/null +++ b/src/libskabus/skabus_pub_subunsub_async.c @@ -0,0 +1,22 @@ +/* ISC license. */ + +#include <string.h> +#include <sys/uio.h> +#include <errno.h> + +#include <skalibs/skaclient.h> + +#include <skabus/pub.h> + +int skabus_pub_subunsub_async (skabus_pub_t *a, char what, char const *id, unsigned char *err) +{ + size_t idlen = strlen(id) ; + char pack[2] = { what, (unsigned char)idlen } ; + struct iovec v[2] = + { + { .iov_base = pack, .iov_len = 2 }, + { .iov_base = (char *)id, .iov_len = idlen+1 } + } ; + if (idlen > SKABUS_PUB_IDSTR_SIZE) return (errno = ERANGE, 0) ; + return skaclient_putv(&a->connection, v, 2, &skaclient_default_cb, err) ; +} diff --git a/src/libskabus/skabus_pub_update.c b/src/libskabus/skabus_pub_update.c new file mode 100644 index 0000000..46e699c --- /dev/null +++ b/src/libskabus/skabus_pub_update.c @@ -0,0 +1,58 @@ +/* ISC license. */ + +#include <string.h> +#include <stdint.h> +#include <errno.h> + +#include <skalibs/posixishard.h> +#include <skalibs/uint64.h> +#include <skalibs/tai.h> +#include <skalibs/alloc.h> +#include <skalibs/genalloc.h> +#include <skalibs/unixmessage.h> +#include <skalibs/skaclient.h> + +#include <skabus/pub.h> + +static int handler (unixmessage_t const *m, void *x) +{ + skabus_pub_t *a = (skabus_pub_t *)x ; + size_t n = genalloc_len(skabus_pub_cltinfo_t, &a->info) ; + char const *s = m->s ; + size_t len = m->len ; + skabus_pub_cltinfo_t *p ; + uint8_t idlen ; + if (len < 11 + TAIN_PACK || !m->nfds) goto errproto ; + if (!genalloc_readyplus(skabus_pub_cltinfo_t, &a->info, 1)) goto err ; + p = genalloc_s(skabus_pub_cltinfo_t, &a->info) + n ; + p->fd = m->fds[0] ; + p->nfds = m->nfds - 1 ; + if (p->nfds > 1) + { + p->fds = alloc(p->nfds * sizeof(int)) ; + if (!p->fds) goto err ; + for (size_t i = 0 ; i < p->nfds ; i++) p->fds[i] = m->fds[1 + i] ; + } + else p->fds = 0 ; + uint64_unpack_big(s, &p->msginfo.serial) ; s += 8 ; len -= 8 ; + tain_unpack(s, &p->msginfo.timestamp) ; s += TAIN_PACK ; len -= TAIN_PACK ; + p->msginfo.flags = *s++ ; len-- ; + idlen = *s++ ; len-- ; + if (len != (size_t)idlen + 1 || s[idlen] || idlen > SKABUS_PUB_IDSTR_SIZE) goto errprotof ; + memcpy(p->msginfo.sender, s, len) ; + genalloc_setlen(skabus_pub_cltinfo_t, &a->info, n+1) ; + return 1 ; + + errprotof: + alloc_free(p->fds) ; + errproto: + errno = EPROTO ; + err: + unixmessage_drop(m) ; + return 0 ; +} + +int skabus_pub_update (skabus_pub_t *a) +{ + return skaclient_update(&a->connection, &handler, a) ; +} diff --git a/src/pub/deps-exe/skabus-pub-daemon b/src/pub/deps-exe/skabus-pub-daemon new file mode 100755 index 0000000..e7187fe --- /dev/null +++ b/src/pub/deps-exe/skabus-pub-daemon @@ -0,0 +1 @@ +-lskarnet diff --git a/src/pub/deps-exe/skabus-pubd b/src/pub/deps-exe/skabus-pubd new file mode 100755 index 0000000..ac0cace --- /dev/null +++ b/src/pub/deps-exe/skabus-pubd @@ -0,0 +1,4 @@ +-ls6 +-lskarnet +${SOCKET_LIB} +${TAINNOW_LIB} diff --git a/src/pub/skabus-pub-daemon.c b/src/pub/skabus-pub-daemon.c new file mode 100644 index 0000000..54b4359 --- /dev/null +++ b/src/pub/skabus-pub-daemon.c @@ -0,0 +1,162 @@ +/* ISC license. */ + +#include <sys/types.h> +#include <limits.h> + +#include <skalibs/types.h> +#include <skalibs/sgetopt.h> +#include <skalibs/strerr2.h> +#include <skalibs/djbunix.h> + +#include <s6/config.h> + +#include <skabus/config.h> + +#define USAGE "skabus-pub-daemon [ -v verbosity ] [ -d | -D ] [ -1 ] [ -c maxconn ] [ -b backlog ] [ -G gid,gid,... ] [ -g gid ] [ -u uid ] [ -U ] [ -t timeout ] [ -T lameducktimeout ] [ -i rulesdir | -x rulesfile ] [ -S | -s ] [ -k announcere ] path msgfsdir" +#define dieusage() strerr_dieusage(100, USAGE) + +int main (int argc, char const *const *argv, char const *const *envp) +{ + unsigned int verbosity = 1 ; + int flag1 = 0 ; + int flagU = 0 ; + int flagreuse = 1 ; + uid_t uid = 0 ; + gid_t gid = 0 ; + gid_t gids[NGROUPS_MAX] ; + size_t gidn = (unsigned int)-1 ; + unsigned int maxconn = 0 ; + unsigned int backlog = (unsigned int)-1 ; + int flagpublic = 0 ; + unsigned int timeout = 0 ; + unsigned int ltimeout = 0 ; + char const *rulesdir = 0 ; + char const *rulesfile = 0 ; + char const *announcere = 0 ; + PROG = "skabus-pub-daemon" ; + { + subgetopt_t l = SUBGETOPT_ZERO ; + for (;;) + { + int opt = subgetopt_r(argc, argv, "Dd1USsv:c:b:u:g:G:t:T:i:x:k:", &l) ; + if (opt == -1) break ; + switch (opt) + { + case 'D' : flagreuse = 0 ; break ; + case 'd' : flagreuse = 1 ; break ; + case '1' : flag1 = 1 ; break ; + case 'v' : if (!uint0_scan(l.arg, &verbosity)) dieusage() ; break ; + case 'c' : if (!uint0_scan(l.arg, &maxconn)) dieusage() ; if (!maxconn) maxconn = 1 ; break ; + case 'b' : if (!uint0_scan(l.arg, &backlog)) dieusage() ; break ; + case 'u' : if (!uid0_scan(l.arg, &uid)) dieusage() ; break ; + case 'g' : if (!gid0_scan(l.arg, &gid)) dieusage() ; break ; + case 'G' : if (!gid_scanlist(gids, NGROUPS_MAX, l.arg, &gidn) && *l.arg) dieusage() ; break ; + case 'U' : flagU = 1 ; uid = 0 ; gid = 0 ; gidn = (size_t)-1 ; break ; + case 'S' : flagpublic = 0 ; break ; + case 's' : flagpublic = 1 ; break ; + case 't' : if (!uint0_scan(l.arg, &timeout)) dieusage() ; break ; + case 'T' : if (!uint0_scan(l.arg, <imeout)) dieusage() ; break ; + case 'i' : rulesdir = l.arg ; rulesfile = 0 ; break ; + case 'x' : rulesfile = l.arg ; rulesdir = 0 ; break ; + case 'k' : announcere = l.arg ; break ; + default : dieusage() ; + } + } + argc -= l.ind ; argv += l.ind ; + if (argc < 2) dieusage() ; + } + + { + unsigned int m = 0, pos = 0 ; + char const *newargv[33] ; + char fmt[UINT_FMT * 4 + UID_FMT + GID_FMT * (1 + NGROUPS_MAX)] ; + newargv[m++] = S6_EXTBINPREFIX "s6-ipcserver-socketbinder" ; + if (!flagreuse) newargv[m++] = "-D" ; + if (backlog != (unsigned int)-1) + { + newargv[m++] = "-b" ; + newargv[m++] = fmt + pos ; + pos += uint_fmt(fmt + pos, backlog) ; + fmt[pos++] = 0 ; + } + newargv[m++] = "--" ; + newargv[m++] = *argv++ ; + if (flagU || uid || gid || gidn != (unsigned int)-1) + { + newargv[m++] = S6_EXTBINPREFIX "s6-applyuidgid" ; + if (flagU) newargv[m++] = "-Uz" ; + if (uid) + { + newargv[m++] = "-u" ; + newargv[m++] = fmt + pos ; + pos += uid_fmt(fmt + pos, uid) ; + fmt[pos++] = 0 ; + } + if (gid) + { + newargv[m++] = "-g" ; + newargv[m++] = fmt + pos ; + pos += gid_fmt(fmt + pos, gid) ; + fmt[pos++] = 0 ; + } + if (gidn != (unsigned int)-1) + { + newargv[m++] = "-G" ; + newargv[m++] = fmt + pos ; + pos += gid_fmtlist(fmt + pos, gids, gidn) ; + fmt[pos++] = 0 ; + } + newargv[m++] = "--" ; + } + newargv[m++] = SKABUS_BINPREFIX "skabus-pubd" ; + if (verbosity != 1) + { + newargv[m++] = "-v" ; + newargv[m++] = fmt + pos ; + pos += uint_fmt(fmt + pos, verbosity) ; + fmt[pos++] = 0 ; + } + if (flag1) newargv[m++] = "-1" ; + if (flagpublic) newargv[m++] = "-s" ; + if (maxconn) + { + newargv[m++] = "-c" ; + newargv[m++] = fmt + pos ; + pos += uint_fmt(fmt + pos, maxconn) ; + fmt[pos++] = 0 ; + } + if (timeout) + { + newargv[m++] = "-t" ; + newargv[m++] = fmt + pos ; + pos += uint_fmt(fmt + pos, timeout) ; + fmt[pos++] = 0 ; + } + if (ltimeout) + { + newargv[m++] = "-T" ; + newargv[m++] = fmt + pos ; + pos += uint_fmt(fmt + pos, ltimeout) ; + fmt[pos++] = 0 ; + } + if (rulesdir) + { + newargv[m++] = "-i" ; + newargv[m++] = rulesdir ; + } + else if (rulesfile) + { + newargv[m++] = "-x" ; + newargv[m++] = rulesfile ; + } + if (announcere) + { + newargv[m++] = "-k" ; + newargv[m++] = announcere ; + } + newargv[m++] = "--" ; + newargv[m++] = *argv++ ; + newargv[m++] = 0 ; + xpathexec_run(newargv[0], newargv, envp) ; + } +} diff --git a/src/pub/skabus-pubd.c b/src/pub/skabus-pubd.c new file mode 100644 index 0000000..99a6ddf --- /dev/null +++ b/src/pub/skabus-pubd.c @@ -0,0 +1,964 @@ +/* ISC license. */ + +#include <stdint.h> +#include <unistd.h> +#include <sys/stat.h> +#include <sys/uio.h> +#include <fcntl.h> +#include <errno.h> +#include <signal.h> +#include <stdlib.h> +#include <regex.h> + +#include <skalibs/posixishard.h> +#include <skalibs/uint32.h> +#include <skalibs/uint64.h> +#include <skalibs/types.h> +#include <skalibs/allreadwrite.h> +#include <skalibs/bytestr.h> +#include <skalibs/sgetopt.h> +#include <skalibs/env.h> +#include <skalibs/error.h> +#include <skalibs/strerr2.h> +#include <skalibs/stralloc.h> +#include <skalibs/tai.h> +#include <skalibs/djbunix.h> +#include <skalibs/sig.h> +#include <skalibs/iopause.h> +#include <skalibs/selfpipe.h> +#include <skalibs/cdb.h> +#include <skalibs/webipc.h> +#include <skalibs/genset.h> +#include <skalibs/avltree.h> +#include <skalibs/avltreen.h> +#include <skalibs/unixmessage.h> +#include <skalibs/unixconnection.h> +#include <skalibs/skaclient.h> + +#include <s6/accessrules.h> + +#include <skabus/pub.h> + +#define USAGE "skabus-pubd [ -v verbosity ] [ -1 ] [ -c maxconn ] [ -t timeout ] [ -T lameducktimeout ] [ -i rulesdir | -x rulesfile ] [ -S | -s ] [ -k controlre ] msgfsdir" +#define dieusage() strerr_dieusage(100, USAGE) ; +#define dienomem() strerr_diefu1sys(111, "stralloc_catb") ; +#define die() strerr_dief1sys(101, "unexpected error") ; + +#define MSGINFO_PACK(n) (11 + TAIN_PACK + (n)) + +static unsigned int verbosity = 1 ; +static int cont = 1 ; +static uint64_t serial = 1 ; +static tain_t answertto = TAIN_INFINITE_RELATIVE ; +static tain_t lameduckdeadline = TAIN_INFINITE_RELATIVE ; +static int flagidstrpub = 0 ; + +static unsigned int rulestype = 0 ; +static char const *rules = 0 ; +static int cdbfd = -1 ; +static struct cdb cdbmap = CDB_ZERO ; + +static char const *msgfsdir ; + +static void handle_signals (void) +{ + for (;;) switch (selfpipe_read()) + { + case -1 : strerr_diefu1sys(111, "selfpipe_read()") ; + case 0 : return ; + case SIGTERM : + { + if (cont) + { + cont = 0 ; + tain_add_g(&lameduckdeadline, &lameduckdeadline) ; + } + break ; + } + case SIGHUP : + { + int fd ; + struct cdb c = CDB_ZERO ; + if (rulestype != 2) break ; + fd = open_readb(rules) ; + if (fd < 0) break ; + if (cdb_init(&c, fd) < 0) + { + fd_close(fd) ; + break ; + } + cdb_free(&cdbmap) ; + fd_close(cdbfd) ; + cdbfd = fd ; + cdbmap = c ; + } + break ; + default : break ; + } +} + +static void *uint32_dtok (uint32_t d, void *x) +{ + (void)x ; + return (void *)(uintptr_t)d ; +} + +static int ptr_cmp (void const *a, void const *b, void *x) +{ + (void)x ; + return a < b ? -1 : a > b ; +} + + + /* fd reference counter */ + +typedef struct fdcount_s fdcount_t, *fdcount_t_ref ; +struct fdcount_s +{ + int fd ; + uint32_t n ; +} ; + +static void *fdcount_dtok (uint32_t d, void *x) +{ + return &GENSETDYN_P(fdcount_t, (gensetdyn *)x, d)->fd ; +} + +static int fd_cmp (void const *a, void const *b, void *x) +{ + int fda = *(int *)a ; + int fdb = *(int *)b ; + (void)x ; + return fda < fdb ? -1 : fda > fdb ; +} + +static gensetdyn fdcountblob = GENSETDYN_INIT(fdcount_t, 3, 3, 8) ; +static avltree fdcountmap = AVLTREE_INIT(3, 3, 8, &fdcount_dtok, &fd_cmp, &fdcountblob) ; +#define FDCOUNT(i) GENSETDYN_P(fdcount_t, &fdcountblob, (i)) + +static inline fdcount_t *fdcount_search (int fd) +{ + uint32_t d ; + fdcount_t *p ; + if (avltree_search(&fdcountmap, &fd, &d)) return FDCOUNT(d) ; + if (!gensetdyn_new(&fdcountblob, &d)) dienomem() ; + p = FDCOUNT(d) ; + p->fd = fd ; + p->n = 0 ; + if (!avltree_insert(&fdcountmap, d)) dienomem() ; + return p ; +} + +static void fdcount_closecb (int fd, void *p) +{ + uint32_t d ; + avltree_search(&fdcountmap, &fd, &d) ; + if (!--FDCOUNT(d)->n) fd_close(fd) ; +} + + + /* client */ + +typedef struct client_s client_t, *client_t_ref ; +struct client_s +{ + uint32_t next ; + uint32_t xindexsync ; + uint32_t xindexasync ; + tain_t deadline ; + regex_t idstr_re ; + regex_t subscribe_re ; + regex_t write_re ; + avltree subscribers ; + unixmessage_sender_t asyncout ; + unixconnection_t sync ; + char idstr[SKABUS_PUB_IDSTR_SIZE + 1] ; +} ; + +static genset *clients ; +static unsigned int sentinel ; +static avltreen *clientmap ; +#define CLIENT(i) genset_p(client_t, clients, (i)) +#define numconn (genset_n(clients) - 1) + +static inline void client_free (client_t *c) +{ + fd_close(unixmessage_sender_fd(&c->sync.out)) ; + if (unixmessage_sender_fd(&c->asyncout) >= 0) + fd_close(unixmessage_sender_fd(&c->asyncout)) ; + unixconnection_free(&c->sync) ; + unixmessage_sender_free(&c->asyncout) ; + if (c->idstr[0]) + { + regfree(&c->subscribe_re) ; + regfree(&c->write_re) ; + } + else regfree(&c->idstr_re) ; + avltree_free(&c->subscribers) ; +} + +static void *idstr_dtok (uint32_t d, void *x) +{ + (void)x ; + return CLIENT(d)->idstr ; +} + +static int idstr_cmp (void const *a, void const *b, void *x) +{ + (void)x ; + return strcmp((char const *)a, (char const *)b) ; +} + +static inline void client_delete (uint32_t cc, uint32_t prev) +{ + client_t *c = CLIENT(cc) ; + uint32_t i = CLIENT(sentinel)->next ; + while (i != sentinel) + { + client_t *p = CLIENT(i) ; + avltree_delete(&p->subscribers, (void const *)(uintptr_t)cc) ; + i = p->next ; + } + CLIENT(prev)->next = c->next ; + if (c->idstr[0]) avltreen_delete(clientmap, c->idstr) ; + client_free(c) ; + genset_delete(clients, cc) ; +} + +static void remove (uint32_t *i, uint32_t j) +{ + client_delete(*i, j) ; + *i = j ; +} + +static void client_setdeadline (client_t *c) +{ + tain_t blah ; + tain_half(&blah, &tain_infinite_relative) ; + tain_add_g(&blah, &blah) ; + if (tain_less(&blah, &c->deadline)) + tain_add_g(&c->deadline, &answertto) ; +} + +static inline int client_prepare_iopause (uint32_t i, tain_t *deadline, iopause_fd *x, uint32_t *j) +{ + client_t *c = CLIENT(i) ; + if (tain_less(&c->deadline, deadline)) *deadline = c->deadline ; + if (!unixmessage_sender_isempty(&c->sync.out) || !unixmessage_receiver_isempty(&c->sync.in) || (cont && !unixmessage_receiver_isfull(&c->sync.in))) + { + x[*j].fd = unixmessage_sender_fd(&c->sync.out) ; + x[*j].events = (!unixmessage_receiver_isempty(&c->sync.in) || (cont && !unixmessage_receiver_isfull(&c->sync.in)) ? IOPAUSE_READ : 0) | (!unixmessage_sender_isempty(&c->sync.out) ? IOPAUSE_WRITE : 0) ; + c->xindexsync = (*j)++ ; + } + else c->xindexsync = 0 ; + if (!unixmessage_sender_isempty(&c->asyncout)) + { + x[*j].fd = unixmessage_sender_fd(&c->asyncout) ; + x[*j].events = IOPAUSE_WRITE ; + c->xindexasync = (*j)++ ; + } + else c->xindexasync = 0 ; + return c->xindexsync || c->xindexasync ; +} + +static inline void client_add (int fd, regex_t const *idstr_re, unsigned int flags) +{ + uint32_t i = genset_new(clients) ; + client_t *c = CLIENT(i) ; + unixconnection_init(&c->sync, fd, fd) ; + unixmessage_sender_init_withclosecb(&c->asyncout, -(int)flags - 1, &fdcount_closecb, 0) ; + c->idstr[0] = 0 ; + c->idstr_re = *idstr_re ; + avltree_init(&c->subscribers, 3, 3, 8, &uint32_dtok, &ptr_cmp, 0) ; + tain_add_g(&c->deadline, &answertto) ; + c->next = CLIENT(sentinel)->next ; + CLIENT(sentinel)->next = i ; +} + +static int enqueue_message (uint32_t dd, unixmessage_t const *m) +{ + client_t *d = CLIENT(dd) ; + if (!unixmessage_put_and_close(&d->asyncout, m, unixmessage_bits_closeall)) return 0 ; + client_setdeadline(d) ; + for (unsigned int i = 0 ; i < m->nfds ; i++) + fdcount_search(m->fds[i])->n++ ; + return 1 ; +} + +static int unsendmessage_iter (uint32_t dd, unsigned int h, void *data) +{ + unixmessage_unput(&CLIENT(dd)->asyncout) ; + (void)h ; + (void)data ; + return 1 ; +} + +static int sendmessage_iter (uint32_t dd, unsigned int h, void *data) +{ + (void)h ; + return enqueue_message(dd, (unixmessage_t *)data) ; +} + +static int store_text (char const *s, size_t len) +{ + int fd ; + size_t msgfsdirlen = strlen(msgfsdir) ; + char fn[msgfsdirlen + TAIN_PACK + 10] ; + fn[0] = '@' ; + memcpy(fn + 1, msgfsdir, msgfsdirlen) ; + fn[1 + msgfsdirlen] = '/' ; + tain_pack(fn + msgfsdirlen + 2, &STAMP) ; + memcpy(fn + msgfsdirlen + 2 + TAIN_PACK, ":XXXXXX", 8) ; + fd = mkstemp(fn) ; + if (fd < 0) return fd ; + if (!writenclose_unsafe(fd, s, len)) + { + fd_close(fd) ; + return -1 ; + } + fd_close(fd) ; + fd = open(fn, O_RDONLY) ; /* too bad you can't just lose writing rights */ + unlink_void(fn) ; /* still there as long as fd is open */ + return fd ; +} + +static void fill_msginfo (char *pack, char const *idstr, size_t idlen, uint8_t flags) +{ + uint64_pack_big(pack, serial++) ; + tain_pack(pack + 8, &STAMP) ; + pack[8 + TAIN_PACK] = flags ; + pack[9 + TAIN_PACK] = idlen ; + memcpy(pack + 10 + TAIN_PACK, idstr, idlen + 1) ; +} + +static int announce (char const *what, size_t whatlen, char const *idstr) +{ + size_t idlen = strlen(idstr) ; + int fd ; + char s[MSGINFO_PACK(idlen)] ; + unixmessage_t m = { .s = s, .len = MSGINFO_PACK(idlen), .fds = &fd, .nfds = 1 } ; + if (!*idstr) return 1 ; + fill_msginfo(s, idstr, idlen, 0) ; + fd = store_text(what, whatlen) ; + if (fd < 0) return 0 ; + if (!avltree_iter_withcancel(&CLIENT(sentinel)->subscribers, &sendmessage_iter, &unsendmessage_iter, &m)) return 0 ; + return 1 ; +} + +static inline int client_flush (uint32_t i, iopause_fd const *x) +{ + client_t *c = CLIENT(i) ; + int isflushed = 2 ; + if (c->xindexsync && (x[c->xindexsync].revents & IOPAUSE_WRITE)) + { + if (!unixmessage_sender_flush(&c->sync.out)) + if (error_isagain(errno)) isflushed = 0 ; + else + { + char what[2] = "-" ; + what[1] = errno ; + if (verbosity) strerr_warnwu2sys("unixmessage_sender_flush ", c->idstr) ; + if (!announce(what, 2, c->idstr)) dienomem() ; + return 0 ; + } + else isflushed = 1 ; + } + + if (c->xindexasync && (x[c->xindexasync].revents & IOPAUSE_WRITE)) + { + if (!unixmessage_sender_flush(&c->asyncout)) + if (error_isagain(errno)) isflushed = 0 ; + else + { + char what[2] = "-" ; + what[1] = errno ; + if (verbosity) strerr_warnwu2sys("unixmessage_sender_flush ", c->idstr) ; + if (!announce(what, 2, c->idstr)) dienomem() ; + return 0 ; + } + else isflushed = !!isflushed ; + } + + if (isflushed == 1) tain_add_g(&c->deadline, &tain_infinite_relative) ; + return 1 ; +} + +static int answer (client_t *c, char e) +{ + unixmessage_t m = { .s = &e, .len = 1, .fds = 0, .nfds = 0 } ; + if (!unixmessage_put(&c->sync.out, &m)) return 0 ; + client_setdeadline(c) ; + return 1 ; +} + +static int do_register (uint32_t cc, unixmessage_t const *m) +{ + uint32_t srelen, wrelen, dummy ; + client_t *c = CLIENT(cc) ; + char const *s = m->s ; + size_t len = m->len ; + int r ; + size_t idlen ; + if (len < 12 || m->nfds) return (errno = EPROTO, 0) ; + idlen = (unsigned char)*s++ ; len-- ; + uint32_unpack_big(s, &srelen) ; s += 4 ; len -= 4 ; + uint32_unpack_big(s, &wrelen) ; s += 4 ; len -= 4 ; + if (len != idlen + srelen + wrelen + 3 || s[idlen] || s[idlen + srelen + 1] || s[idlen + srelen + wrelen + 2] || !idlen || !s[idlen-1]) return (errno = EPROTO, 0) ; + if (idlen > SKABUS_PUB_IDSTR_SIZE) return answer(c, ENAMETOOLONG) ; + if (c->idstr[0]) return answer(c, EISCONN) ; + if (regexec(&c->idstr_re, s, 0, 0, 0)) return answer(c, EPERM) ; + if (avltreen_search(clientmap, s, &dummy)) return answer(c, EBUSY) ; + r = regcomp(&c->subscribe_re, s + idlen + 1, REG_EXTENDED | REG_NOSUB) ; + if (r) return answer(c, r == REG_ESPACE ? ENOMEM : EINVAL) ; + r = regcomp(&c->write_re, s + idlen + srelen + 2, REG_EXTENDED | REG_NOSUB) ; + if (r) + { + regfree(&c->subscribe_re) ; + return answer(c, r == REG_ESPACE ? ENOMEM : EINVAL) ; + } + memcpy(c->idstr, s, idlen+1) ; + avltreen_insert(clientmap, cc) ; + if (!announce("+", 1, s)) + { + char e = errno ; + regfree(&c->write_re) ; + regfree(&c->subscribe_re) ; + avltreen_delete(clientmap, c->idstr) ; + return answer(c, e) ; + } + regfree(&c->idstr_re) ; + return answer(c, 0) ; +} + +static int do_list (uint32_t cc, unixmessage_t const *m) +{ + client_t *c = CLIENT(cc) ; + uint32_t dd = CLIENT(sentinel)->next ; + if (m->len || m->nfds) return (errno = EPROTO, 0) ; + unsigned int n = avltreen_len(clientmap) - 1 ; + { + char pack[9] = "" ; + char lens[n] ; + struct iovec v[1+(n<<1)] ; + unixmessage_v_t mreply = { .v = v, .vlen = 1+(n<<1), .fds = 0, .nfds = 0 } ; + unsigned int registered = 0 ; + v[0].iov_base = pack ; v[0].iov_len = 9 ; + for (unsigned int i = 0 ; i < n ; i++) + { + client_t *d = CLIENT(dd) ; + if (d->idstr[0]) + { + size_t len = strlen(d->idstr) ; + lens[i] = (unsigned char)len ; + v[1+(i<<1)].iov_base = lens + i ; + v[1+(i<<1)].iov_len = 1 ; + v[2+(i<<1)].iov_base = d->idstr ; + v[2+(i<<1)].iov_len = len+1 ; + registered++ ; + } + dd = d->next ; + } + uint32_pack_big(pack+1, registered) ; + uint32_pack_big(pack+5, n - registered) ; + if (!unixmessage_putv(&c->sync.out, &mreply)) return answer(c, errno) ; + } + client_setdeadline(c) ; + return 1 ; +} + +static int do_unsubscribe (uint32_t cc, unixmessage_t const *m) +{ + uint32_t dd ; + char const *s = m->s ; + size_t len = m->len ; + client_t *c = CLIENT(cc) ; + if (!len-- || m->nfds) return (errno = EPROTO, 0) ; + if (len != (unsigned int)(unsigned char)s[0] + 1 || s[len]) return (errno = EPROTO, 0) ; + s++ ; + if (!c->idstr[0]) return answer(c, EPERM) ; + if (!avltreen_search(clientmap, s, &dd)) return answer(c, errno) ; + if (!avltree_delete(&CLIENT(dd)->subscribers, (void const *)(uintptr_t)cc)) return answer(c, errno) ; + return answer(c, 0) ; +} + +static int do_subscribe (uint32_t cc, unixmessage_t const *m) +{ + client_t *c = CLIENT(cc) ; + char const *s = m->s ; + size_t len = m->len ; + client_t *d ; + uint32_t dd, dummy ; + if (!len-- || m->nfds) return (errno = EPROTO, 0) ; + if (len != (unsigned int)(unsigned char)s[0] + 1 || s[len]) return (errno = EPROTO, 0) ; + s++ ; + if (!c->idstr[0]) return answer(c, EPERM) ; + if (!avltreen_search(clientmap, s, &dd)) return answer(c, errno) ; + d = CLIENT(dd) ; + if (regexec(&d->subscribe_re, c->idstr, 0, 0, 0)) return answer(c, EPERM) ; + if (!avltree_search(&d->subscribers, c->idstr, &dummy)) + { + if (!avltree_insert(&d->subscribers, cc)) return answer(c, errno) ; + } + return answer(c, 0) ; +} + +static int do_sendpm (uint32_t cc, unixmessage_t const *m) +{ + client_t *c = CLIENT(cc) ; + char const *s = m->s ; + size_t len = m->len ; + size_t idlen ; + uint32_t dd ; + client_t *d ; + if (len < 2) return (errno = EPROTO, 0) ; + idlen = (unsigned char)s[0] ; s++ ; len-- ; + if (len < idlen + 1 || s[idlen]) return (errno = EPROTO, 0) ; + if (idlen > SKABUS_PUB_IDSTR_SIZE) + { + unixmessage_drop(m) ; + return answer(c, ENAMETOOLONG) ; + } + if (m->nfds > UNIXMESSAGE_MAXFDS - 1) + { + unixmessage_drop(m) ; + return answer(c, ENFILE) ; + } + if (!c->idstr[0]) + { + unixmessage_drop(m) ; + return answer(c, EPERM) ; + } + if (!avltreen_search(clientmap, s, &dd)) + { + unixmessage_drop(m) ; + return answer(c, errno) ; + } + s += idlen+1 ; len -= idlen+1 ; + d = CLIENT(dd) ; + if (regexec(&d->write_re, c->idstr, 0, 0, 0)) + { + unixmessage_drop(m) ; + return answer(c, EPERM) ; + } + + { + size_t cidlen = strlen(c->idstr) ; + char pack[1 + MSGINFO_PACK(cidlen)] ; + int fds[1 + m->nfds] ; + unixmessage_t mtosend = { .s = pack + 1, .len = MSGINFO_PACK(cidlen), .fds = fds, .nfds = 1 + m->nfds } ; + fds[0] = store_text(s, len) ; + if (fds[0] < 0) + { + unixmessage_drop(m) ; + return answer(c, errno) ; + } + fill_msginfo(pack + 1, c->idstr, cidlen, 1) ; + for (unsigned int i = 0 ; i < m->nfds ; i++) fds[1+i] = m->fds[i] ; + if (!unixmessage_put_and_close(&d->asyncout, &mtosend, unixmessage_bits_closeall)) + { + unixmessage_drop(m) ; + return answer(c, errno) ; + } + client_setdeadline(d) ; + pack[0] = 0 ; + mtosend.s = pack ; + mtosend.len = 9 ; + mtosend.fds = 0 ; + mtosend.nfds = 0 ; + return enqueue_message(cc, &mtosend) ; + } +} + +static int do_send (uint32_t cc, unixmessage_t const *m) +{ + client_t *c = CLIENT(cc) ; + size_t cidlen = strlen(c->idstr) ; + if (m->nfds > UNIXMESSAGE_MAXFDS - 1) + { + unixmessage_drop(m) ; + return answer(c, ENFILE) ; + } + { + char pack[1 + MSGINFO_PACK(cidlen)] ; + int fds[1 + m->nfds] ; + unixmessage_t mtosend = { .s = pack + 1, .len = MSGINFO_PACK(cidlen), .fds = fds, .nfds = 1 + m->nfds } ; + fds[0] = store_text(m->s, m->len) ; + if (fds[0] < 0) + { + unixmessage_drop(m) ; + return answer(c, errno) ; + } + fill_msginfo(pack + 1, c->idstr, cidlen, 0) ; + for (unsigned int i = 0 ; i < m->nfds ; i++) fds[1+i] = m->fds[i] ; + if (!avltree_iter_withcancel(&c->subscribers, &sendmessage_iter, &unsendmessage_iter, &mtosend)) + { + unixmessage_drop(m) ; + return answer(c, errno) ; + } + pack[0] = 0 ; + mtosend.s = pack ; + mtosend.len = 9 ; + mtosend.fds = 0 ; + mtosend.nfds = 0 ; + return enqueue_message(cc, &mtosend) ; + } +} + +static int do_error (uint32_t cc, unixmessage_t const *m) +{ + (void)cc ; + (void)m ; + return (errno = EPROTO, 0) ; +} + +typedef int parsefunc_t (uint32_t, unixmessage_t const *) ; +typedef parsefunc_t *parsefunc_t_ref ; + +static inline int parse_protocol (unixmessage_t const *m, void *p) +{ + static parsefunc_t_ref const f[7] = + { + &do_send, + &do_sendpm, + &do_subscribe, + &do_unsubscribe, + &do_list, + &do_register, + &do_error + } ; + unixmessage_t mcopy = { .s = m->s + 1, .len = m->len - 1, .fds = m->fds, .nfds = m->nfds } ; + if (!m->len) + { + unixmessage_drop(m) ; + return (errno = EPROTO, 0) ; + } + if (!(*f[byte_chr("!+SULR", 6, m->s[0])])(*(uint32_t *)p, &mcopy)) + { + unixmessage_drop(m) ; + return 0 ; + } + return 1 ; +} + +static inline int client_read (uint32_t cc, iopause_fd const *x) +{ + client_t *c = CLIENT(cc) ; + if (!unixmessage_receiver_isempty(&c->sync.in) || (c->xindexsync && (x[c->xindexsync].revents & IOPAUSE_READ))) + { + if (unixmessage_sender_fd(&c->asyncout) < 0) + { + unixmessage_t m ; + int r = unixmessage_receive(&c->sync.in, &m) ; + if (r < 0) return -1 ; + if (r) + { + unsigned int flags = -(unixmessage_sender_fd(&c->asyncout) + 1) ; + if (!skaclient_server_ack(&m, &c->sync.out, &c->asyncout, SKABUS_PUB_BANNER1, SKABUS_PUB_BANNER1_LEN, SKABUS_PUB_BANNER2, SKABUS_PUB_BANNER2_LEN)) + { + unixmessage_drop(&m) ; + return -1 ; + } + if (!(flags & 1)) unixmessage_receiver_refuse_fds(&c->sync.in) ; + } + } + else + { + int r = unixmessage_handle(&c->sync.in, &parse_protocol, &cc) ; + if (r <= 0) return r ; + } + } + return 1 ; +} + + + /* Environment on new connections */ + +static int makere (regex_t *re, char const *s, char const *var) +{ + size_t varlen = strlen(var) ; + if (str_start(s, var) && (s[varlen] == '=')) + { + int r = regcomp(re, s + varlen + 1, REG_EXTENDED | REG_NOSUB) ; + if (r) + { + if (verbosity) + { + char buf[256] ; + regerror(r, re, buf, 256) ; + strerr_warnw6x("invalid ", var, " value: ", s + varlen + 1, ": ", buf) ; + } + return -1 ; + } + else return 1 ; + } + return 0 ; +} + +static void defaultre (regex_t *re, int flag) +{ + char const *s = flag ? ".*" : "^$" ; + int r = regcomp(re, s, REG_EXTENDED | REG_NOSUB) ; + if (r) + { + char buf[256] ; + regerror(r, re, buf, 256) ; + strerr_diefu4x(100, "compile ", s, " into a regular expression: ", buf) ; + } +} + +static inline int parse_env (char const *const *envp, regex_t *idstr_re, unsigned int *flags) +{ + unsigned int fl = 0 ; + int idstr_done = 0 ; + for (; *envp ; envp++) + { + if (str_start(*envp, "SKABUS_PUB_SENDFDS=")) fl |= 1 ; + if (!idstr_done) + { + idstr_done = makere(idstr_re, *envp, "SKABUS_PUB_ID_REGEX") ; + if (idstr_done < 0) return 0 ; + } + if (idstr_done) return 1 ; + } + if (!idstr_done) defaultre(idstr_re, flagidstrpub) ; + *flags = fl ; + return 1 ; +} + +static inline int new_connection (int fd, regex_t *idstr_re, unsigned int *flags) +{ + s6_accessrules_params_t params = S6_ACCESSRULES_PARAMS_ZERO ; + s6_accessrules_result_t result = S6_ACCESSRULES_ERROR ; + uid_t uid ; + gid_t gid ; + + if (getpeereid(fd, &uid, &gid) < 0) + { + if (verbosity) strerr_warnwu1sys("getpeereid") ; + return 0 ; + } + switch (rulestype) + { + case 0 : + result = S6_ACCESSRULES_ALLOW ; break ; + case 1 : + result = s6_accessrules_uidgid_fs(uid, gid, rules, ¶ms) ; break ; + case 2 : + result = s6_accessrules_uidgid_cdb(uid, gid, &cdbmap, ¶ms) ; break ; + default : break ; + } + if (result != S6_ACCESSRULES_ALLOW) + { + if (verbosity && (result == S6_ACCESSRULES_ERROR)) + strerr_warnw1sys("error while checking rules") ; + return 0 ; + } + if (params.exec.s) + { + stralloc_free(¶ms.exec) ; + if (verbosity) + { + char fmtuid[UID_FMT] ; + char fmtgid[GID_FMT] ; + fmtuid[uid_fmt(fmtuid, uid)] = 0 ; + fmtgid[gid_fmt(fmtgid, gid)] = 0 ; + strerr_warnw4x("unused exec string in rules for uid ", fmtuid, " gid ", fmtgid) ; + } + } + if (params.env.s) + { + size_t n = byte_count(params.env.s, params.env.len, '\0') ; + char const *envp[n+1] ; + if (!env_make(envp, n, params.env.s, params.env.len)) + { + if (verbosity) strerr_warnwu1sys("env_make") ; + stralloc_free(¶ms.env) ; + return 0 ; + } + envp[n] = 0 ; + if (!parse_env(envp, idstr_re, flags)) + { + if (verbosity) strerr_warnwu1sys("parse_env") ; + s6_accessrules_params_free(¶ms) ; + return 0 ; + } + s6_accessrules_params_free(¶ms) ; + } + return 1 ; +} + +int main (int argc, char const *const *argv, char const *const *envp) +{ + int spfd ; + int flag1 = 0 ; + char const *announce_re = "^$" ; + unsigned int maxconn = 40 ; + PROG = "skabus-pubd" ; + + { + subgetopt_t l = SUBGETOPT_ZERO ; + unsigned int t = 0, T = 0 ; + for (;;) + { + int opt = subgetopt_r(argc, argv, "v:Ss1c:k:i:x:t:T:", &l) ; + if (opt == -1) break ; + switch (opt) + { + case 'v' : if (!uint0_scan(l.arg, &verbosity)) dieusage() ; break ; + case 'S' : flagidstrpub = 0 ; break ; + case 's' : flagidstrpub = 1 ; break ; + case '1' : flag1 = 1 ; break ; + case 'k' : announce_re = l.arg ; break ; + case 'i' : rules = l.arg ; rulestype = 1 ; break ; + case 'x' : rules = l.arg ; rulestype = 2 ; break ; + case 't' : if (!uint0_scan(l.arg, &t)) dieusage() ; break ; + case 'T' : if (!uint0_scan(l.arg, &T)) dieusage() ; break ; + case 'c' : if (!uint0_scan(l.arg, &maxconn)) dieusage() ; break ; + default : dieusage() ; + } + } + argc -= l.ind ; argv += l.ind ; + if (t) tain_from_millisecs(&answertto, t) ; + if (T) tain_from_millisecs(&lameduckdeadline, T) ; + } + if (!argc) dieusage() ; + if (maxconn > SKABUS_PUB_MAX) maxconn = SKABUS_PUB_MAX ; + if (!maxconn) maxconn = 1 ; + { + struct stat st ; + if (fstat(0, &st) < 0) strerr_diefu1sys(111, "fstat stdin") ; + if (!S_ISSOCK(st.st_mode)) strerr_dief1x(100, "stdin is not a socket") ; + } + if (flag1) + { + if (fcntl(1, F_GETFD) < 0) + strerr_dief1sys(100, "called with option -1 but stdout said") ; + } + else fd_close(1) ; + { + struct stat st ; + if (stat(argv[0], &st) < 0) + strerr_diefu2sys(111, "stat ", argv[0]) ; + if (!S_ISDIR(st.st_mode)) + { + errno = ENOTDIR ; + strerr_diefu2sys(100, "work in ", argv[0]) ; + } + } + msgfsdir = argv[0] ; + spfd = selfpipe_init() ; + if (spfd < 0) strerr_diefu1sys(111, "selfpipe_init") ; + if (sig_ignore(SIGPIPE) < 0) strerr_diefu1sys(111, "ignore SIGPIPE") ; + { + sigset_t set ; + sigemptyset(&set) ; + sigaddset(&set, SIGTERM) ; + sigaddset(&set, SIGHUP) ; + if (selfpipe_trapset(&set) < 0) strerr_diefu1sys(111, "trap signals") ; + } + + if (rulestype == 2) + { + cdbfd = open_readb(rules) ; + if (cdbfd < 0) strerr_diefu3sys(111, "open ", rules, " for reading") ; + if (cdb_init(&cdbmap, cdbfd) < 0) + strerr_diefu2sys(111, "cdb_init ", rules) ; + } + + { /* I present to you: the stack */ + GENSETB_TYPE(client_t, 1+maxconn) blob ; + iopause_fd x[2 + (maxconn << 1)] ; + AVLTREEN_DECLARE_AND_INIT(blobmap, 1+maxconn, &idstr_dtok, &idstr_cmp, &blob) ; + + GENSETB_init(client_t, &blob, 1+maxconn) ; + clients = &blob.info ; + sentinel = gensetb_new(&blob) ; + blob.storage[sentinel].next = sentinel ; + blob.storage[sentinel].idstr[0] = 0 ; + { + int r = regcomp(&blob.storage[sentinel].subscribe_re, announce_re, REG_EXTENDED | REG_NOSUB) ; + if (r) + { + char buf[256] ; + regerror(r, &blob.storage[sentinel].subscribe_re, buf, 256) ; + strerr_dief4x(100, "invalid control regex: ", announce_re, ": ", buf) ; + } + } + if (regcomp(&blob.storage[sentinel].write_re, "^$", REG_NOSUB)) strerr_diefu1x(100, "regcomp ^$") ; + avltree_init(&blob.storage[sentinel].subscribers, 3, 3, 8, &uint32_dtok, &ptr_cmp, 0) ; + avltreen_insert(&blobmap, sentinel) ; + clientmap = &blobmap ; + x[0].fd = spfd ; x[0].events = IOPAUSE_READ ; + x[1].fd = 0 ; + + if (flag1) + { + fd_write(1, "\n", 1) ; + fd_close(1) ; + } + tain_now_g() ; + + for (;;) + { + tain_t deadline ; + uint32_t i = blob.storage[sentinel].next, j = 2 ; + int r = 1 ; + if (cont) tain_add_g(&deadline, &tain_infinite_relative) ; + else deadline = lameduckdeadline ; + x[1].events = (cont && (numconn < maxconn)) ? IOPAUSE_READ : 0 ; + for (; i != sentinel ; i = blob.storage[i].next) + if (client_prepare_iopause(i, &deadline, x, &j)) r = 0 ; + if (!cont && r) break ; + + r = iopause_g(x, j, &deadline) ; + if (r < 0) strerr_diefu1sys(111, "iopause") ; + + if (!r) + { + for (j = sentinel, i = blob.storage[sentinel].next ; i != sentinel ; j = i, i = blob.storage[i].next) + if (!tain_future(&blob.storage[i].deadline)) + { + char what[2] = "-" ; + what[1] = ETIMEDOUT ; + if (!announce(what, 2, CLIENT(i)->idstr)) dienomem() ; + remove(&i, j) ; + } + continue ; + } + + if (x[0].revents & IOPAUSE_READ) handle_signals() ; + + for (j = sentinel, i = blob.storage[sentinel].next ; i != sentinel ; j = i, i = blob.storage[i].next) + if (!client_flush(i, x)) remove(&i, j) ; + + for (j = sentinel, i = blob.storage[sentinel].next ; i != sentinel ; j = i, i = blob.storage[i].next) + switch (client_read(i, x)) + { + case 0 : errno = 0 ; + case -1 : + case -2 : + { + char what[2] = "-" ; + what[1] = errno ; + if (!announce(what, 2, CLIENT(i)->idstr)) dienomem() ; + } + remove(&i, j) ; + case 1 : break ; + default : errno = EILSEQ ; die() ; + } + + if (x[1].revents & IOPAUSE_READ) + { + regex_t idstr_re ; + unsigned int flags = 0 ; + int fd = ipc_accept_nb(x[1].fd, 0, 0, 0) ; + if (fd < 0) + if (!error_isagain(errno)) strerr_diefu1sys(111, "accept") ; + else continue ; + else if (!new_connection(fd, &idstr_re, &flags)) fd_close(fd) ; + else client_add(fd, &idstr_re, flags) ; + } + } + } + return 0 ; +} |