From 1038275dd526cb7a0e1ff1ab682baa5880d8eb4c Mon Sep 17 00:00:00 2001 From: Matheus Izvekov Date: Tue, 6 Aug 2019 18:00:15 -0300 Subject: [PATCH] RABID-1008 import stomp library into luastompws --- CMakeLists.txt | 23 ++ frame.c | 801 ++++++++++++++++++++++++++++++++++++++++ frame.h | 51 +++ stomp.c | 711 +++++++++++++++++++++++++++++++++++ stomp.h | 116 ++++++ lstompws.c => stompws.c | 147 ++++---- 6 files changed, 1786 insertions(+), 63 deletions(-) create mode 100644 CMakeLists.txt create mode 100644 frame.c create mode 100644 frame.h create mode 100644 stomp.c create mode 100644 stomp.h rename lstompws.c => stompws.c (86%) diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..a91ca9f --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright (C) 2015-2019 CUJO LLC +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +pkg_check_modules(WEBSOCKETS REQUIRED IMPORTED_TARGET libwebsockets) +assert(CONDITION WEBSOCKETS_VERSION VERSION_GREATER_EQUAL "2.4" + MESSAGE "has libwebsockets ${WEBSOCKETS_VERSION} needs 2.4") +add_lua_library(rabid.stompws "stompws.c" "frame.c" "stomp.c") +target_link_libraries(rabid.stompws PRIVATE PkgConfig::WEBSOCKETS) diff --git a/frame.c b/frame.c new file mode 100644 index 0000000..068b7b3 --- /dev/null +++ b/frame.c @@ -0,0 +1,801 @@ +/* + * Copyright 2019 Evgeni Dobrev + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include + +#include + +#include "frame.h" + +enum read_state { + RS_INIT, + RS_CMD, + RS_HDR, + RS_HDR_ESC, + RS_BODY, + RS_DONE, + RS_ERR +}; + +struct frame_hdr { + ptrdiff_t key_offset; + size_t key_len; + ptrdiff_t val_offset; + size_t val_len; +}; + +struct _frame { + void *buf; + size_t buf_len; /* length of data in buf in bytes */ + size_t buf_capacity; /* allocated size in bytes */ + + ptrdiff_t cmd_offset; /* offset in buff to the start of the cmd string */ + size_t cmd_len; /* lenght of cmd string in bytes */ + + struct frame_hdr *hdrs; /* array of struct frame_hdr elements */ + size_t hdrs_len; /* number of elements in the array */ + size_t hdrs_capacity; /* allocated number of struct frame_hdr elements */ + + struct stomp_hdr *stomp_hdrs; /* array of struct stomp_hdr elements */ + size_t stomp_hdrs_len; /* number of elements in the array */ + size_t stomp_hdrs_capacity; /* allocated number of struct stomp_hdr elements */ + + ptrdiff_t body_offset; /* offset in buff to the start of the body */ + size_t body_len; /* length of body in bytes */ + + enum read_state read_state; /* current state of the frame reading state mashine */ + ptrdiff_t tmp_offset; /* current position within buf while reading an incomming frame */ + size_t tmp_len; /* amount of bytes read while reading an incomming frame */ +}; + +/* number of bytes to increase session->buf by + * when adding more data to the frame */ +#define BUFINCLEN 512 + +/* number of struct stomp_hdr structures to add to session->hdrs + * when adding more data to the frame */ +#define HDRINCLEN 4 + +static int parse_content_length(const char *s, size_t *len) +{ + size_t tmp_len; + char *endptr; + const char *nptr = s; + + if (!s) { + errno = EINVAL; + return -1; + } + + errno = 0; + tmp_len = strtoul(nptr, &endptr, 10); + if ((errno == ERANGE ) || (errno != 0 && tmp_len == 0)) { + errno = EINVAL; + return -1; + } + + if (endptr == nptr) { + errno = EINVAL; + return -1; + } + + *len = tmp_len; + + return 0; +} + +frame_t *frame_new() +{ + frame_t *f = calloc(1, sizeof(*f)); + + return f; +} + +void frame_free(frame_t *f) +{ + free(f->stomp_hdrs); + free(f->hdrs); + free(f->buf); + free(f); +} + +void frame_reset(frame_t *f) +{ + void *buf = f->buf; + size_t capacity = f->buf_capacity; + struct frame_hdr *hdrs = f->hdrs; + size_t hdrs_capacity = f->hdrs_capacity; + struct stomp_hdr *stomp_hdrs = f->stomp_hdrs; + size_t stomp_hdrs_capacity = f->stomp_hdrs_capacity; + + memset(f, 0, sizeof(*f)); + memset(hdrs, 0, sizeof(*hdrs)*hdrs_capacity); + memset(stomp_hdrs, 0, sizeof(*stomp_hdrs)*stomp_hdrs_capacity); + + f->buf = buf; + f->buf_capacity = capacity; + f->hdrs = hdrs; + f->hdrs_capacity = hdrs_capacity; + f->stomp_hdrs = stomp_hdrs; + f->stomp_hdrs_capacity = stomp_hdrs_capacity; + f->read_state = RS_INIT; +} + +static size_t buflene(const void *data, size_t len) +{ + char c; + size_t lene = 0; + size_t i; + + for (i = 0; i < len; i++) { + c = *(char*)(data + i); + if (c == '\r' || c == '\n' || + c == ':' || c == '\\' ) { + lene += 2; + } else { + lene += 1; + } + } + + return lene; +} + +static void *frame_alloc(frame_t *f, size_t len) +{ + size_t capacity; + void *buf; + + if (f->buf_capacity - f->buf_len >= len) { + return f->buf + f->buf_len; + } + + capacity = f->buf_capacity + \ + (BUFINCLEN > len ? BUFINCLEN : len); + + buf = realloc(f->buf, capacity); + if (!buf) { + return NULL; + } + + memset(buf + f->buf_len, 0, (capacity - f->buf_len)); + + f->buf = buf; + f->buf_capacity = capacity; + + return f->buf + f->buf_len; +} + +static void *frame_bufcat(frame_t *f, const void *data, size_t len) +{ + void *dest; + + dest = frame_alloc(f, len); + if (!dest) { + return NULL; + } + + dest = memcpy(dest, data, len); + + f->buf_len += len; + + return dest; +} + +static void *frame_bufcate(frame_t *f, const void *data, size_t len) +{ + size_t i; + void *dest; + char c; + char *buf; + size_t buf_len; + size_t lene; + + lene = buflene(data, len); + if (lene == len) { + return frame_bufcat(f, data, len); + } + + dest = frame_alloc(f, lene); + if (!dest) { + return NULL; + } + + for (i = 0; i < len; i++) { + c = *(char *)(data + i); + switch(c){ + case '\r': + buf = "\\r"; + buf_len = 2; + break; + case '\n': + buf = "\\n"; + buf_len = 2; + break; + case ':': + buf = "\\c"; + buf_len = 2; + break; + case '\\': + buf = "\\\\"; + buf_len = 2; + break; + default: + buf = (char *)(data + i); + buf_len = 1; + } + + memcpy(f->buf + f->buf_len, buf, buf_len); + f->buf_len += buf_len; + } + + return dest; +} + +int frame_cmd_set(frame_t *f, const char *cmd) +{ + void *dest; + size_t len; + + if (!f) { + errno = EINVAL; + return -1; + } + + if (f->cmd_len) { + errno = EINVAL; + return -1; + } + + if (!cmd) { + errno = EINVAL; + return -1; + } + + len = strlen(cmd); + if (!len) { + errno = EINVAL; + return -1; + } + + dest = frame_bufcat(f, cmd, len); + if (!dest) { + return -1; + } + + f->cmd_offset = dest - f->buf; + f->cmd_len = len; + + if (!frame_bufcat(f, "\n", 1)) { + return -1; + } + + return 0; +} + +int frame_hdr_add(frame_t *f, const char *key, const char *val) +{ + struct frame_hdr *h; + void *dest; + size_t key_len; + size_t val_len; + + if (!f) { + errno = EINVAL; + return -1; + } + + if (!f->cmd_len) { + errno = EINVAL; + return -1; + } + + // TODO what about zero length body frames + if (f->body_offset) { + errno = EINVAL; + return -1; + } + + if (!key) { + errno = EINVAL; + return -1; + } + + key_len = strlen(key); + + if (!key_len) { + errno = EINVAL; + return -1; + } + + if (!val) { + errno = EINVAL; + return -1; + } + + val_len = strlen(val); + + if (!val_len) { + errno = EINVAL; + return -1; + } + + + if (!(f->hdrs_capacity - f->hdrs_len)) { + size_t capacity = f->hdrs_capacity + HDRINCLEN; + h = realloc(f->hdrs, capacity * sizeof(*h)); + if (!h) { + return -1; + } + + memset(&h[f->hdrs_len], 0, sizeof(*h)*(capacity - f->hdrs_len)); + + f->hdrs = h; + f->hdrs_capacity = capacity; + } + + h = &f->hdrs[f->hdrs_len]; + dest = frame_bufcate(f, key, key_len); + if (!dest) { + return -1; + } + + h->key_offset = dest - f->buf; + + if (!frame_bufcat(f, ":", 1)) { + return -1; + } + + dest = frame_bufcate(f, val, val_len); + if (!dest) { + return -1; + } + + h->val_offset = dest - f->buf; + + if (!frame_bufcat(f, "\n", 1)) { + return -1; + } + + f->hdrs_len += 1; + + return 0; +} + +int frame_hdrs_add(frame_t *f, size_t hdrc, const struct stomp_hdr *hdrs) +{ + size_t i; + const struct stomp_hdr *h; + + if (!hdrs) { + errno = EINVAL; + return -1; + } + + for (i=0; i < hdrc; i++) { + h = &hdrs[i]; + if (frame_hdr_add(f, h->key, h->val)) { + return -1; + } + } + + return 0; +} + +static size_t frame_hdr_get(frame_t *f, const char *key, const char **val) +{ + size_t i; + const struct frame_hdr *h; + for (i=0; i < f->hdrs_len; i++) { + h = &f->hdrs[i]; + if (!strncmp(key, f->buf + h->key_offset, h->key_len)) { + *val = f->buf + h->val_offset; + return h->val_len; + } + } + + return 0; +} + + +int frame_body_set(frame_t *f, const void *data, size_t len) +{ + void *dest; + ptrdiff_t offset; + + if (!f) { + errno = EINVAL; + return -1; + } + + if (!f->cmd_len) { + errno = EINVAL; + return -1; + } + + if (f->body_offset) { + errno = EINVAL; + return -1; + } + + /* end of headers */ + if (!frame_bufcat(f, "\n", 1)) { + return -1; + } + + dest = frame_bufcat(f, data, len); + if (!dest) { + return -1; + } + + offset = dest - f->buf; + + /* end of frame */ + if (!frame_bufcat(f, "\0", 1)) { + return -1; + } + + f->body_offset = offset; + f->body_len = len; + + return 0; +} + +static enum read_state frame_read_body(frame_t *f, char c) +{ + void *tmp; + enum read_state state = f->read_state; + size_t body_len; + const char *l; + + tmp = frame_bufcat(f, &c, 1); + if (!tmp) { + return RS_ERR; + } + + if (!f->tmp_offset) { + f->tmp_offset = tmp - f->buf; + } + + f->tmp_len += 1; + + if (c != '\0') { + return state; + } + + /* \0 and no content-type -> frame is over */ + /* OR err parsing content-type -> frame is over */ + /* OR loaded all data -> frame is over */ + if (!frame_hdr_get(f, "content-length", &l) || parse_content_length(l, &body_len) || f->tmp_len >= body_len){ + f->body_offset = f->tmp_offset; + f->body_len = f->tmp_len - 1; /* skip last '\0' */ + return RS_DONE; + } + + return state; +} + + +ssize_t frame_write(struct lws* wsi, frame_t *f) +{ + size_t left; + size_t n; + + /* close the frame */ + if (!f->body_offset) { + if (!frame_bufcat(f, "\n\0", 2)) { + return -1; + } + } + + left = f->buf_len; + + unsigned char *ws_buf = calloc(1, LWS_SEND_BUFFER_PRE_PADDING + + left + LWS_SEND_BUFFER_POST_PADDING + 1); + + if (!ws_buf) + return -1; + + memcpy(ws_buf + LWS_SEND_BUFFER_PRE_PADDING, f->buf, left); + + n = lws_write(wsi, &ws_buf[LWS_SEND_BUFFER_PRE_PADDING], left, LWS_WRITE_TEXT); + + /* assert(n == left); */ + /* printf("stomp (frame_write): %zu bytes written\n", n); */ + + free(ws_buf); + + return n; +} + +static enum read_state frame_read_init(frame_t *f, char c) +{ + void *tmp; + enum read_state state = f->read_state; + + switch (c) { + case 'C': /* CONNECTED */ + case 'E': /* ERROR */ + case 'R': /* RECEIPT */ + case 'M': /* MESSAGE */ + state = RS_ERR; + tmp = frame_bufcat(f, &c, 1); + if (tmp) { + state = RS_CMD; + f->tmp_offset = tmp - f->buf; + f->tmp_len = 1; + } + break; + case '\n': /* heart-beat */ + state = RS_DONE; + break; + default: + ; + } + + return state; +} + +static enum read_state frame_read_cmd(frame_t *f, char c) +{ + enum read_state state = f->read_state; + + switch (c) { + case '\r': + break; + case '\0': + state = RS_ERR; + break; + case '\n': + state = RS_ERR; + if (frame_bufcat(f, "\0", 1)) { + if (!strncmp(f->buf + f->tmp_offset, "CONNECTED", f->tmp_len) || + !strncmp(f->buf + f->tmp_offset, "ERROR", f->tmp_len) || + !strncmp(f->buf + f->tmp_offset, "RECEIPT", f->tmp_len) || + !strncmp(f->buf + f->tmp_offset, "MESSAGE", f->tmp_len)) + { + f->cmd_offset = f->tmp_offset; + f->cmd_len = f->tmp_len; + state = RS_HDR; + f->tmp_offset = 0; + f->tmp_len = 0; + } + } + break; + default: + if (!frame_bufcat(f, &c, 1)) { + state = RS_ERR; + } else { + f->tmp_len += 1; + } + } + + return state; +} + +static enum read_state frame_read_hdr(frame_t *f, char c) +{ + struct frame_hdr *h; + void *tmp; + size_t count = f->hdrs_len; + enum read_state state = f->read_state; + + if (!(f->hdrs_capacity - count)) { + size_t capacity = f->hdrs_capacity + HDRINCLEN; + h = realloc(f->hdrs, capacity * sizeof(*h)); + if (!h) { + return RS_ERR; + } + + memset(&h[count], 0, sizeof(*h)*(capacity - count)); + + f->hdrs = h; + f->hdrs_capacity = capacity; + } + + h = &f->hdrs[f->hdrs_len]; + + switch (c) { + case '\0': + state = RS_ERR; + break; + case '\r': + break; + case ':': + if (!frame_bufcat(f, "\0", 1)) { + state = RS_ERR; + } else { + h->key_offset = f->tmp_offset; + h->key_len = f->tmp_len; + f->tmp_offset = 0; + f->tmp_len = 0; + } + break; + case '\n': + if (h->key_len) { + if (!frame_bufcat(f, "\0", 1)) { + state = RS_ERR; + } else { + h->val_offset = f->tmp_offset; + h->val_len = f->tmp_len; + f->hdrs_len += 1; + f->tmp_offset = 0; + f->tmp_len = 0; + } + } else { + state = RS_BODY; + } + break; + case '\\': + state = RS_HDR_ESC; + break; + default: + tmp = frame_bufcat(f, &c, 1); + if (!tmp) { + state = RS_ERR; + } else { + if (!f->tmp_offset) { + f->tmp_offset = tmp - f->buf; + } + f->tmp_len += 1; + } + } + + return state; +} + +static enum read_state frame_read_hdr_esc(frame_t *f, char c) +{ + char *buf; + void *tmp; + + if (c == 'r') { + buf = "\r"; + } else if (c == 'n') { + buf = "\n"; + } else if (c == 'c') { + buf = ":"; + } else if (c == '\\') { + buf = "\\"; + } else { + return RS_ERR; + } + + tmp = frame_bufcat(f, buf, 1); + if (!tmp) { + return RS_ERR; + } + + if (!f->tmp_offset) { + f->tmp_offset = tmp - f->buf; + } + + f->tmp_len += 1; + + return RS_HDR; +} + +int frame_read(const unsigned char* ptr, size_t len, frame_t *f) +{ + unsigned char c = 0; + + size_t pos = 0; + + while (pos < len && f->read_state != RS_ERR && f->read_state != RS_DONE) { + + c = *(ptr+(pos++)); + + switch(f->read_state) { + case RS_INIT: + f->read_state = frame_read_init(f, c); + break; + case RS_CMD: + f->read_state = frame_read_cmd(f, c); + break; + case RS_HDR: + f->read_state = frame_read_hdr(f, c); + break; + case RS_HDR_ESC: + f->read_state = frame_read_hdr_esc(f, c); + break; + case RS_BODY: + f->read_state = frame_read_body(f, c); + break; + default: + return -1; + } + } + + if (f->read_state == RS_ERR) { + return -1; + } + + return 0; +} + +size_t frame_cmd_get(frame_t *f, const char **cmd) +{ + if (!f->cmd_len) { + return 0; + } + + *cmd = f->buf + f->cmd_offset; + return f->cmd_len; +} + +size_t frame_hdrs_get(frame_t *f, const struct stomp_hdr **hdrs) +{ + struct stomp_hdr *h; + size_t i; + + if (!f->hdrs) { + return 0; + } + + if (f->stomp_hdrs_len == f->hdrs_len) { + *hdrs = f->stomp_hdrs; + return f->stomp_hdrs_len; + } + + if (f->hdrs_len > f->stomp_hdrs_capacity) { + h = realloc(f->stomp_hdrs, f->hdrs_len * sizeof(*h)); + if (!h) { + return -1; + } + + f->stomp_hdrs = h; + f->stomp_hdrs_capacity = f->hdrs_len; + } + + for (i=0; i < f->hdrs_len; i++) { + h = &f->stomp_hdrs[i]; + h->key = f->buf + f->hdrs[i].key_offset; + h->val = f->buf + f->hdrs[i].val_offset; + } + + f->stomp_hdrs_len = f->hdrs_len; + + *hdrs = f->stomp_hdrs; + return f->stomp_hdrs_len; +} + +size_t frame_body_get(frame_t *f, const void **body) +{ + if (!f->body_len) { + return 0; + } + + *body = f->buf + f->body_offset; + return f->body_len; +} + diff --git a/frame.h b/frame.h new file mode 100644 index 0000000..ec0eb7c --- /dev/null +++ b/frame.h @@ -0,0 +1,51 @@ +/* + * Copyright 2019 Evgeni Dobrev + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +#ifndef STOMP_FRAME_H +#define STOMP_FRAME_H + +#include "stomp.h" + +typedef struct _frame frame_t; + +frame_t *frame_new(); +void frame_free(frame_t *f); +void frame_reset(frame_t *f); +int frame_cmd_set(frame_t *f, const char *cmd); +int frame_hdr_add(frame_t *f, const char *key, const char *val); +int frame_hdrs_add(frame_t *f, size_t hdrc, const struct stomp_hdr *hdrs); +int frame_body_set(frame_t *f, const void *body, size_t len); +ssize_t frame_write(struct lws* fd, frame_t *f); + +size_t frame_cmd_get(frame_t *f, const char **cmd); +size_t frame_hdrs_get(frame_t *f, const struct stomp_hdr **hdrs); +size_t frame_body_get(frame_t *f, const void **body); +int frame_read(const unsigned char* buf, size_t len, frame_t *f); + +#endif diff --git a/stomp.c b/stomp.c new file mode 100644 index 0000000..cfbf93e --- /dev/null +++ b/stomp.c @@ -0,0 +1,711 @@ +/* + * Copyright 2019 Evgeni Dobrev + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "frame.h" +#include "stomp.h" + +/* enough space for ULLONG_MAX as string */ +#define ULL_STR_LEN 25 + +struct stomp_callbacks { + void (*connected)(stomp_session_t *, void *, void *); + void (*message)(stomp_session_t *, void *, void *); + void (*error)(stomp_session_t *, void *, void *); + void (*receipt)(stomp_session_t *, void *, void *); +}; + +struct _stomp_session { + struct stomp_callbacks callbacks; /* event callbacks */ + void *ctx; /* ptr to session context */ + frame_t *frame_out; /* library -> broker */ + frame_t *frame_in; /* broker -> library */ + enum stomp_prot protocol; + struct lws *broker_fd; /* pointer to a WS instance */ + int client_id; /* unique ids for subscribe */ + unsigned long client_hb; /* client heartbeat [ms] */ + unsigned long broker_hb; /* broker heartbeat [ms] */ + struct timespec last_write; + struct timespec last_read; + int run; +}; + +static int parse_version(const char *, enum stomp_prot *); +static int parse_heartbeat(const char *, unsigned long *, unsigned long *); +static void on_connected(stomp_session_t *); +static void on_receipt(stomp_session_t *); +static void on_error(stomp_session_t *); +static void on_message(stomp_session_t *); +static const char *hdr_get(size_t, const struct stomp_hdr *, const char *); + +stomp_session_t * +stomp_session_new(void *session_ctx) +{ + stomp_session_t *s; + + if ((s = calloc(1, sizeof(*s))) == NULL) + return (NULL); + + s->ctx = session_ctx; + s->broker_fd = NULL; + + if ((s->frame_out = frame_new()) == NULL) + free(s); + + if ((s->frame_in = frame_new()) == NULL) { + free(s->frame_out); + free(s); + } + + return (s); +} + +void +stomp_session_free(stomp_session_t *s) +{ + frame_free(s->frame_out); + frame_free(s->frame_in); + free(s); +} + +void +stomp_callback_set(stomp_session_t *s, enum stomp_cb_type type, stomp_cb_t cb) +{ + if (s == NULL) + return; + + switch (type) { + case SCB_CONNECTED: + s->callbacks.connected = cb; + break; + case SCB_ERROR: + s->callbacks.error = cb; + break; + case SCB_MESSAGE: + s->callbacks.message = cb; + break; + case SCB_RECEIPT: + s->callbacks.receipt = cb; + break; + default: + return; + } +} + +int +stomp_connect(stomp_session_t *s, struct lws* wsi, size_t hdrc, + const struct stomp_hdr *hdrs) +{ + + unsigned long x = 0, y = 0; + const char *hb; + + hb = hdr_get(hdrc, hdrs, "heart-beat"); + + /* + * Heart-beat is optional, so hb may be NULL without problem. + */ + if (hb != NULL && parse_heartbeat(hb, &x, &y)) { + errno = EINVAL; + return (-1); + } + + s->broker_fd = wsi; + s->run = 1; + + frame_reset(s->frame_out); + + if (frame_cmd_set(s->frame_out, "CONNECT")) + return (-1); + + s->client_hb = x; + s->broker_hb = y; + + if (frame_hdrs_add(s->frame_out, hdrc, hdrs)) + return (-1); + + if (frame_write(wsi, s->frame_out) < 0) { + s->run = 0; + return (-1); + } + + clock_gettime(CLOCK_MONOTONIC, &s->last_write); + + return (0); +} + +int +stomp_disconnect(stomp_session_t *s, size_t hdrc, const struct stomp_hdr *hdrs) +{ + frame_reset(s->frame_out); + + if (frame_cmd_set(s->frame_out, "DISCONNECT") || + frame_hdrs_add(s->frame_out, hdrc, hdrs)) + return (-1); + + if (frame_write(s->broker_fd, s->frame_out) < 0) { + s->run = 0; + return (-1); + } + + clock_gettime(CLOCK_MONOTONIC, &s->last_write); + + return (0); +} + +/* TODO enforce different client-ids in case they are provided with hdrs */ +int +stomp_subscribe(stomp_session_t *s, size_t hdrc, const struct stomp_hdr *hdrs) +{ + int client_id = 0; + char buf[ULL_STR_LEN]; + const char *ack; + + if (hdr_get(hdrc, hdrs, "destination") == NULL) { + errno = EINVAL; + return (-1); + } + + ack = hdr_get(hdrc, hdrs, "ack"); + if (ack != NULL && strcmp(ack, "auto") != 0 && + strcmp(ack, "client") != 0 && + strcmp(ack, "client-individual") != 0) { + errno = EINVAL; + return (-1); + } + + frame_reset(s->frame_out); + + if (frame_cmd_set(s->frame_out, "SUBSCRIBE")) + return (-1); + + if (hdr_get(hdrc, hdrs, "id") == NULL) { + client_id = s->client_id; + if (client_id == INT_MAX) + client_id = 0; + client_id++; + snprintf(buf, ULL_STR_LEN, "%d", client_id); + if (frame_hdr_add(s->frame_out, "id", buf)) + return (-1); + } + + if ((ack == NULL && frame_hdr_add(s->frame_out, "ack", "auto")) || + frame_hdrs_add(s->frame_out, hdrc, hdrs)) + return (-1); + + if (frame_write(s->broker_fd, s->frame_out) < 0) { + s->run = 0; + return (-1); + } + + clock_gettime(CLOCK_MONOTONIC, &s->last_write); + s->client_id = client_id; + + return (client_id); +} + +int +stomp_unsubscribe(stomp_session_t *s, int client_id, size_t hdrc, + const struct stomp_hdr *hdrs) +{ + char buf[ULL_STR_LEN]; + const char *id, *destination; + + id = hdr_get(hdrc, hdrs, "id"); + destination = hdr_get(hdrc, hdrs, "destination"); + + if (s->protocol == SPL_10) { + if (destination == NULL && id == NULL && client_id == 0) { + errno = EINVAL; + return (-1); + } + } else if (id == NULL && client_id == 0) { + errno = EINVAL; + return (-1); + } + + frame_reset(s->frame_out); + + if (frame_cmd_set(s->frame_out, "UNSUBSCRIBE")) + return (-1); + + /* user provided client id. overrride all other supplied headers */ + if (client_id) { + snprintf(buf, ULL_STR_LEN, "%lu", (unsigned long)client_id); + if (frame_hdr_add(s->frame_out, "id", buf)) + return (-1); + } + + if (frame_hdrs_add(s->frame_out, hdrc, hdrs)) + return (-1); + + if (frame_write(s->broker_fd, s->frame_out) < 0) { + s->run = 0; + return (-1); + } + + clock_gettime(CLOCK_MONOTONIC, &s->last_write); + + return (0); +} + +/* TODO enforce different tx_ids */ +int +stomp_begin(stomp_session_t *s, size_t hdrc, const struct stomp_hdr *hdrs) +{ + if (hdr_get(hdrc, hdrs, "transaction") == NULL) { + errno = EINVAL; + return (-1); + } + + frame_reset(s->frame_out); + + if (frame_cmd_set(s->frame_out, "BEGIN") || + frame_hdrs_add(s->frame_out, hdrc, hdrs)) + return (-1); + + if (frame_write(s->broker_fd, s->frame_out) < 0) { + s->run = 0; + return (-1); + } + + clock_gettime(CLOCK_MONOTONIC, &s->last_write); + + return (0); +} + +int +stomp_abort(stomp_session_t *s, size_t hdrc, const struct stomp_hdr *hdrs) +{ + if (hdr_get(hdrc, hdrs, "transaction") == NULL) { + errno = EINVAL; + return (-1); + } + + frame_reset(s->frame_out); + + if (frame_cmd_set(s->frame_out, "ABORT") || + frame_hdrs_add(s->frame_out, hdrc, hdrs)) + return (-1); + + if (frame_write(s->broker_fd, s->frame_out) < 0) { + s->run = 0; + return (-1); + } + + clock_gettime(CLOCK_MONOTONIC, &s->last_write); + + return (0); +} + +int +stomp_ack(stomp_session_t *s, size_t hdrc, const struct stomp_hdr *hdrs) +{ + switch(s->protocol) { + case SPL_12: + if (hdr_get(hdrc, hdrs, "id") == NULL) { + errno = EINVAL; + return (-1); + } + break; + case SPL_11: + if (hdr_get(hdrc, hdrs, "message-id") == NULL || + hdr_get(hdrc, hdrs, "subscription") == NULL) { + errno = EINVAL; + return (-1); + } + if (hdr_get(hdrc, hdrs, "subscription") == NULL) { + errno = EINVAL; + return (-1); + } + break; + default: /* SPL_10 */ + if (hdr_get(hdrc, hdrs, "message-id") == NULL) { + errno = EINVAL; + return (-1); + } + } + + frame_reset(s->frame_out); + + if (frame_cmd_set(s->frame_out, "ACK") || + frame_hdrs_add(s->frame_out, hdrc, hdrs)) + return (-1); + + if (frame_write(s->broker_fd, s->frame_out) < 0) { + s->run = 0; + return (-1); + } + + clock_gettime(CLOCK_MONOTONIC, &s->last_write); + + return (0); +} + +int +stomp_nack(stomp_session_t *s, size_t hdrc, const struct stomp_hdr *hdrs) +{ + switch(s->protocol) { + case SPL_12: + if (hdr_get(hdrc, hdrs, "id") == NULL) { + errno = EINVAL; + return (-1); + } + break; + case SPL_11: + if (hdr_get(hdrc, hdrs, "message-id") == NULL || + hdr_get(hdrc, hdrs, "subscription") == NULL) { + errno = EINVAL; + return (-1); + } + break; + default: /* SPL_10 */ + errno = EINVAL; + return (-1); + } + + frame_reset(s->frame_out); + + if (frame_cmd_set(s->frame_out, "NACK") || + frame_hdrs_add(s->frame_out, hdrc, hdrs)) + return (-1); + + if (frame_write(s->broker_fd, s->frame_out) < 0) { + s->run = 0; + return (-1); + } + + clock_gettime(CLOCK_MONOTONIC, &s->last_write); + + return (0); +} + +int +stomp_commit(stomp_session_t *s, size_t hdrc, const struct stomp_hdr *hdrs) +{ + if (hdr_get(hdrc, hdrs, "transaction") == NULL) { + errno = EINVAL; + return (-1); + } + + frame_reset(s->frame_out); + + if (frame_cmd_set(s->frame_out, "COMMIT") || + frame_hdrs_add(s->frame_out, hdrc, hdrs)) + return (-1); + + if (frame_write(s->broker_fd, s->frame_out) < 0) { + s->run = 0; + return (-1); + } + + clock_gettime(CLOCK_MONOTONIC, &s->last_write); + + return (0); +} + +int +stomp_send(stomp_session_t *s, size_t hdrc, const struct stomp_hdr *hdrs, + const void *body, size_t body_len) +{ + char buf[ULL_STR_LEN]; + const char *len; + + if (hdr_get(hdrc, hdrs, "destination") == NULL) { + errno = EINVAL; + return (-1); + } + + frame_reset(s->frame_out); + + if (frame_cmd_set(s->frame_out, "SEND")) + return (-1); + + /* frames SHOULD include a content-length */ + len = hdr_get(hdrc, hdrs, "content-length"); + if (len == 0) { + snprintf(buf, ULL_STR_LEN, "%lu", (unsigned long)body_len); + if (frame_hdr_add(s->frame_out, "content-length", buf)) + return (-1); + } + + if (frame_hdrs_add(s->frame_out, hdrc, hdrs) || + frame_body_set(s->frame_out, body, body_len)) + return (-1); + + if (frame_write(s->broker_fd, s->frame_out) < 0) { + s->run = 0; + return (-1); + } + + clock_gettime(CLOCK_MONOTONIC, &s->last_write); + + return (0); +} + +int +stomp_recv_cmd(stomp_session_t *s, const unsigned char* buf, size_t len) +{ + frame_t *f = s->frame_in; + size_t cmd_len; + int err; + const char *cmd; + + frame_reset(f); + + if ((err = frame_read(buf, len, f))) + return (-1); + + clock_gettime(CLOCK_MONOTONIC, &s->last_read); + + /* heart-beat */ + if ((cmd_len = frame_cmd_get(f, &cmd)) == 0) + return (0); + + if (strncmp(cmd, "CONNECTED", cmd_len) == 0) + on_connected(s); + else if (strncmp(cmd, "ERROR", cmd_len) == 0) + on_error(s); + else if (strncmp(cmd, "RECEIPT", cmd_len) == 0) + on_receipt(s); + else if (strncmp(cmd, "MESSAGE", cmd_len) == 0) + on_message(s); + else + return (-1); + + return (0); +} + +int +stomp_get_broker_hb(stomp_session_t *s) +{ + return s->broker_hb; +} + +int +stomp_get_client_hb(stomp_session_t *s) +{ + return s->client_hb; +} + +int +stomp_send_heartbeat(stomp_session_t *s) +{ + unsigned char *buf = calloc(1, LWS_SEND_BUFFER_PRE_PADDING + 1 + LWS_SEND_BUFFER_POST_PADDING + 1); + + if (buf == NULL) + return (-1); + + buf[LWS_SEND_BUFFER_PRE_PADDING] = '\n'; + int bytes_sent = lws_write(s->broker_fd, &buf[LWS_SEND_BUFFER_PRE_PADDING], 1, LWS_WRITE_TEXT); + free(buf); + return (bytes_sent == -1 ? -1 : 0); +} + +static int +parse_version(const char *s, enum stomp_prot *v) +{ + enum stomp_prot tmp_v; + + if (s == NULL) { + errno = EINVAL; + return (-1); + } + + if (strncmp(s, "1.2", 3) == 0) + tmp_v = SPL_12; + else if (strncmp(s, "1.1", 3) == 0) + tmp_v = SPL_11; + else if (strncmp(s, "1.0", 3) == 0) + tmp_v = SPL_10; + else + tmp_v = SPL_10; + + *v = tmp_v; + + return (0); +} + +static int +parse_heartbeat(const char *s, unsigned long *x, unsigned long *y) +{ + unsigned long tmp_x, tmp_y; + char *endptr; + const char *nptr = s; + + if (s == NULL) + goto error; + + errno = 0; + tmp_x = strtoul(nptr, &endptr, 10); + if (errno != 0) + goto error; + if (tmp_x == ULONG_MAX) + goto error; + if (endptr == nptr) + goto error; + if (*endptr != ',') + goto error; + + nptr = endptr; + nptr++; + + errno = 0; + tmp_y = strtoul(nptr, &endptr, 10); + if (errno != 0) + goto error; + if (tmp_y == ULONG_MAX) + goto error; + if (endptr == nptr) + goto error; + + *x = tmp_x; + *y = tmp_y; + + return (0); + +error: + errno = EINVAL; + return (-1); +} + +static void +on_connected(stomp_session_t *s) +{ + const struct stomp_hdr *hdrs; + struct stomp_ctx_connected e; + frame_t *f = s->frame_in; + unsigned long x, y; + size_t hdrc; + enum stomp_prot v; + const char *h; + + hdrc = frame_hdrs_get(f, &hdrs); + h = hdr_get(hdrc, hdrs, "version"); + if (h != NULL && !parse_version(h, &v)) + s->protocol = v; + + h = hdr_get(hdrc, hdrs, "heart-beat"); + if (h != NULL && !parse_heartbeat(h, &x, &y)) { + if (s->client_hb == 0 || y == 0) + s->client_hb = 0; + else + s->client_hb = s->client_hb > y ? s->client_hb : y; + + if (s->broker_hb == 0 || x == 0) + s->broker_hb = 0; + else + s->broker_hb = s->broker_hb > x ? s->broker_hb : x; + } else { + s->client_hb = 0; + s->broker_hb = 0; + } + + if (s->callbacks.connected == NULL) + return; + + e.hdrc = hdrc; + e.hdrs = hdrs; + + s->callbacks.connected(s, &e, s->ctx); +} + +static void +on_receipt(stomp_session_t *s) +{ + struct stomp_ctx_receipt e; + frame_t *f = s->frame_in; + + if (s->callbacks.receipt == NULL) + return; + + e.hdrc = frame_hdrs_get(f, &e.hdrs); + + s->callbacks.receipt(s, &e, s->ctx); +} + +static void +on_error(stomp_session_t *s) +{ + struct stomp_ctx_error e; + frame_t *f = s->frame_in; + + if (s->callbacks.error == NULL) + return; + + e.hdrc = frame_hdrs_get(f, &e.hdrs); + e.body_len = frame_body_get(f, &e.body); + + s->callbacks.error(s, &e, s->ctx); +} + +static void +on_message(stomp_session_t *s) +{ + struct stomp_ctx_message e; + frame_t *f = s->frame_in; + + if (s->callbacks.message == NULL) + return; + + e.hdrc = frame_hdrs_get(f, &e.hdrs); + e.body_len = frame_body_get(f, &e.body); + + s->callbacks.message(s, &e, s->ctx); +} + +static const char * +hdr_get(size_t count, const struct stomp_hdr *hdrs, const char *key) +{ + const struct stomp_hdr *h; + size_t i; + + for (i = 0; i < count; i++) { + h = &hdrs[i]; + if (strcmp(key, h->key) == 0) + return (h->val); + } + + return (NULL); +} diff --git a/stomp.h b/stomp.h new file mode 100644 index 0000000..c43aad9 --- /dev/null +++ b/stomp.h @@ -0,0 +1,116 @@ +/* + * Copyright 2019 Evgeni Dobrev + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef STOMP_H +#define STOMP_H + +#include +#include + +typedef struct _stomp_session stomp_session_t; + +struct stomp_hdr { + const char *key; + const char *val; +}; + +struct stomp_ctx_connected { + size_t hdrc; + const struct stomp_hdr *hdrs; +}; + +struct stomp_ctx_receipt { + size_t hdrc; + const struct stomp_hdr *hdrs; +}; + +struct stomp_ctx_error { + size_t hdrc; + const struct stomp_hdr *hdrs; + const void *body; + size_t body_len; +}; +struct stomp_ctx_message { + size_t hdrc; + const struct stomp_hdr *hdrs; + const void *body; + size_t body_len; +}; + +enum stomp_prot { + SPL_10, + SPL_11, + SPL_12, +}; + +enum stomp_cb_type { + SCB_CONNECTED, + SCB_ERROR, + SCB_MESSAGE, + SCB_RECEIPT, +}; + +typedef void(*stomp_cb_t)(stomp_session_t *, void *, void *); + +void stomp_callback_set(stomp_session_t *, enum stomp_cb_type, stomp_cb_t); + +stomp_session_t *stomp_session_new(void *); + +void stomp_session_free(stomp_session_t *); + +int stomp_disconnect(stomp_session_t *, size_t, const struct stomp_hdr *); + +int stomp_subscribe(stomp_session_t *, size_t, const struct stomp_hdr *); + +int stomp_unsubscribe(stomp_session_t *, int, size_t, const struct stomp_hdr *); + +int stomp_begin(stomp_session_t *, size_t, const struct stomp_hdr *); + +int stomp_abort(stomp_session_t *, size_t, const struct stomp_hdr *); + +int stomp_ack(stomp_session_t *, size_t, const struct stomp_hdr *); + +int stomp_nack(stomp_session_t *, size_t, const struct stomp_hdr *); + +int stomp_commit(stomp_session_t *, size_t, const struct stomp_hdr *); + +int stomp_send(stomp_session_t *, size_t, const struct stomp_hdr *, const void *, + size_t); + +int stomp_recv_cmd(stomp_session_t *, const unsigned char*, size_t); + +int stomp_connect(stomp_session_t *, struct lws *,size_t, + const struct stomp_hdr *); + +int stomp_send_heartbeat(stomp_session_t *); +int stomp_get_broker_hb(stomp_session_t *); +int stomp_get_client_hb(stomp_session_t *); + +#endif /* STOMP_H */ diff --git a/lstompws.c b/stompws.c similarity index 86% rename from lstompws.c rename to stompws.c index 4b24ed0..887e970 100644 --- a/lstompws.c +++ b/stompws.c @@ -1,27 +1,19 @@ /* - * Copyright (c) 2018 - 2019, CUJO LLC. - * - * Licensed under the MIT license: - * - * http://www.opensource.org/licenses/mit-license.php - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. + * Copyright (C) 2015-2019 CUJO LLC + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include @@ -32,7 +24,7 @@ #include #include -#include +#include "stomp.h" #define LUASTOMPWS_WSPROT_STOMP "v11.stomp" #define LUASTOMPWS_METATAB_CONN "libwebsocket_context*" @@ -128,6 +120,7 @@ FUNC(stomp_session_t *session, void *msg_ptr, void *conn_ptr) \ } STOMP_CALLBACK(stomp_connectedcallback, "connected", false) +//STOMP_CALLBACK(stomp_receiptcallback , "receipt" , false) STOMP_CALLBACK(stomp_errorcallback , "error" , true) STOMP_CALLBACK(stomp_messagecallback , "message" , true) @@ -400,6 +393,8 @@ writable_handler(lua_State *L, stompws_Connection *conn) stomp_errorcallback); stomp_callback_set(conn->stomp_session, SCB_MESSAGE, stomp_messagecallback); + //stomp_callback_set(conn->stomp_session, SCB_RECEIPT, + // stomp_receiptcallback); struct stomp_hdr headers[] = { { "accept-version", "1.1" }, @@ -490,44 +485,44 @@ websocket_callback(struct lws *wsi, enum lws_callback_reasons reason, assert(luaL_checkudata(L, 1, LUASTOMPWS_METATAB_CONN)); switch (reason) { - case LWS_CALLBACK_CLIENT_ESTABLISHED: - return established_handler(L, conn); - case LWS_CALLBACK_CLOSED: - case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: - close_handler(L, conn, reason != LWS_CALLBACK_CLOSED); - break; - case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: - fprintf(stderr, "websocket_callback peer closed\n"); - break; - case LWS_CALLBACK_CLIENT_RECEIVE: - return receive_handler(L, conn, in, len); - case LWS_CALLBACK_CLIENT_WRITEABLE: - return writable_handler(L, conn); - case LWS_CALLBACK_ADD_POLL_FD: - case LWS_CALLBACK_DEL_POLL_FD: - case LWS_CALLBACK_CHANGE_MODE_POLL_FD: - pollfd_handler(L, conn, reason, - (const struct lws_pollargs *)in); - break; - case LWS_CALLBACK_PROTOCOL_INIT: - case LWS_CALLBACK_PROTOCOL_DESTROY: - case LWS_CALLBACK_CLOSED_CLIENT_HTTP: - case LWS_CALLBACK_GET_THREAD_ID: - case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH: - case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: - case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS: - case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: - case LWS_CALLBACK_WSI_CREATE: - case LWS_CALLBACK_WSI_DESTROY: - case LWS_CALLBACK_LOCK_POLL: - case LWS_CALLBACK_UNLOCK_POLL: - break; - default: - fprintf(stderr, "websocket_callback unhandled " - "socket:%p reason:%d user:%p in:%p " - "len:%llu\n", - wsi, reason, user, in, len); - break; + case LWS_CALLBACK_CLIENT_ESTABLISHED: + return established_handler(L, conn); + case LWS_CALLBACK_CLOSED: + case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: + close_handler(L, conn, reason != LWS_CALLBACK_CLOSED); + break; + case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: + fprintf(stderr, "websocket_callback peer closed\n"); + break; + case LWS_CALLBACK_CLIENT_RECEIVE: + return receive_handler(L, conn, in, len); + case LWS_CALLBACK_CLIENT_WRITEABLE: + return writable_handler(L, conn); + case LWS_CALLBACK_ADD_POLL_FD: + case LWS_CALLBACK_DEL_POLL_FD: + case LWS_CALLBACK_CHANGE_MODE_POLL_FD: + pollfd_handler(L, conn, reason, + (const struct lws_pollargs *)in); + break; + case LWS_CALLBACK_PROTOCOL_INIT: + case LWS_CALLBACK_PROTOCOL_DESTROY: + case LWS_CALLBACK_CLOSED_CLIENT_HTTP: + case LWS_CALLBACK_GET_THREAD_ID: + case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH: + case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: + case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS: + case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: + case LWS_CALLBACK_WSI_CREATE: + case LWS_CALLBACK_WSI_DESTROY: + case LWS_CALLBACK_LOCK_POLL: + case LWS_CALLBACK_UNLOCK_POLL: + break; + default: + fprintf(stderr, "websocket_callback unhandled " + "socket:%p reason:%d user:%p in:%p " + "len:%llu\n", + wsi, reason, user, in, len); + break; } return 0; } @@ -681,6 +676,8 @@ stompws_dispatch(lua_State *L) { stompws_Connection *conn = tolstompws(L); + if (conn->L) luaL_error(L, "cannot call this from within callback"); + if (lua_gettop(L) == 1) { conn->L = L; lws_service_fd(conn->ws_context, NULL); @@ -730,6 +727,29 @@ stompws_send(lua_State *L) return 1; } +/* + * connection:rx_flow_control(boolean) + */ +static int +stompws_rx_flow_control(lua_State *L) +{ + stompws_Connection *conn = tolstompws(L); + if (!conn->ws_socket) luaL_error(L, "socket disconnected"); + const bool enable = lua_toboolean(L, 2); + + /* prepare stack for eventual callback */ + lua_settop(L, 1); + + lua_State *L_backup = conn->L; + conn->L = L; + int err = lws_rx_flow_control(conn->ws_socket, enable); + conn->L = L_backup; + + if (err) luaL_error(L, "rx flow control failed"); + + return 0; +} + /* * connection:get_heartbeat() */ @@ -752,6 +772,7 @@ static const luaL_Reg mth[] = { {"connect", stompws_connect}, {"is_connected", stompws_is_connected}, {"send", stompws_send}, + {"rx_flow_control", stompws_rx_flow_control}, {"dispatch", stompws_dispatch}, {"get_heartbeat", stompws_get_heartbeat}, {NULL, NULL} @@ -763,7 +784,7 @@ static const luaL_Reg lib[] = { }; LUALIB_API int -luaopen_stompws(lua_State *L) +luaopen_rabid_stompws(lua_State *L) { lws_set_log_level(0, NULL);