Skip to content

Commit

Permalink
Data delivery to local readers
Browse files Browse the repository at this point in the history
Signed-off-by: TheFixer <[email protected]>
  • Loading branch information
TheFixer committed Dec 20, 2023
1 parent 8448354 commit f5a490e
Showing 1 changed file with 104 additions and 3 deletions.
107 changes: 104 additions & 3 deletions src/durability/src/dds_durability.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "dds__writer.h"
#include "dds/ddsi/ddsi_endpoint.h"
#include "dds/ddsrt/avl.h"
#include "dds/ddsi/ddsi_serdata.h"

#define DEFAULT_QOURUM 1
#define DEFAULT_IDENT "durable_support"
Expand Down Expand Up @@ -236,6 +237,7 @@ static int cmp_proxy_set_reader (const void *a, const void *b)
struct proxy_set_reader_t {
ddsrt_avl_node_t node;
struct proxy_set_reader_key_t key;
dds_entity_t reader;
struct dds_rhc *rhc;
};

Expand Down Expand Up @@ -1444,9 +1446,57 @@ static void dc_process_set_response (struct dc_t *dc, DurableSupport_response *r
}
}

#if 0
int dc_deliver_blob ()
{
}
#endif

/* todo: this function is hared between ds and client */
static enum ddsi_serdata_kind get_serdata_kind (uint8_t kind)
{
enum ddsi_serdata_kind result = SDK_EMPTY;

if (kind == 0) {
result = SDK_EMPTY;
} else if (kind == 1) {
result = SDK_KEY;
} else if (kind == 2) {
result = SDK_DATA;
} else {
DDS_ERROR("Invalid serdata kind %d", kind);
abort();
}
return result;
}

/* These are the offsets used in the data responses.
* TODO: These definitions should actually be shared between the ds and the client.
*/
#define RESPONSE_HEADER_OFFSET_WT 8
#define RESPONSE_HEADER_OFFSET_SEQNUM 16
#define RESPONSE_HEADER_OFFSET_WRITER_GUID 24
#define RESPONSE_HEADER_OFFSET_SERDATA_KIND 40
#define RESPONSE_HEADER_OFFSET_SERDATA 41


static void dc_process_data_response (struct dc_t *dc, DurableSupport_response *response)
{
struct proxy_set_t *proxy_set = NULL;
ddsrt_avl_citer_t it;
const struct ddsi_sertype *sertype;
struct ddsi_serdata *serdata;
uint32_t response_size, serdata_size;;
uint32_t serdata_offset;
ddsrt_iovec_t data_out;
int64_t wt;
uint64_t seqnum;
enum ddsi_serdata_kind serdata_kind;
dds_guid_t wguid;
dds_return_t ret = DDS_RETCODE_OK;
bool autodispose;
struct proxy_set_reader_t *proxy_set_rd;
char id_str[37];

/* TODO:
* A DS also has a response reader (by virtue of the CycloneDDS instance it runs).
Expand All @@ -1468,7 +1518,57 @@ static void dc_process_data_response (struct dc_t *dc, DurableSupport_response *
/* A response_set begin has been received. Because data delivery is reliable,
* we are sure that we missed no responses, so the data response that we received
* must belong to the proxy set. */
// printf("LH *** inject data from proxy set \"%s.%s\"\n", proxy_set->key.partition, proxy_set->key.tpname);
response_size = response->body._u.data.blob._length;
serdata_offset = ddsrt_fromBE4u(*((uint32_t *)response->body._u.data.blob._buffer));
wt = ddsrt_fromBE8(*((int64_t *)(response->body._u.data.blob._buffer + RESPONSE_HEADER_OFFSET_WT)));
seqnum = ddsrt_fromBE8u(*((uint64_t *)(response->body._u.data.blob._buffer + RESPONSE_HEADER_OFFSET_SEQNUM)));
memcpy(&wguid.v, response->body._u.data.blob._buffer + RESPONSE_HEADER_OFFSET_WRITER_GUID, 16);
serdata_kind = get_serdata_kind(*((uint8_t *)response->body._u.data.blob._buffer + RESPONSE_HEADER_OFFSET_SERDATA_KIND));
/* We could now potentially figure out if the response that has been received
* contains fields that we cannot interpret. We can find that out by comparing
* the received response_offset with my own RESPONSE_HEADER_OFFSET_SERDATA.
* If serdata_offset > RESPONSE_HEADER_OFFSET_SERDATA then this is an indication
* that the ds has more fields than this client can interpret. */
/* get the serdata */
serdata_size = response_size - serdata_offset;
data_out.iov_len = serdata_size;
data_out.iov_base = response->body._u.data.blob._buffer + serdata_offset;
/* now deliver the data to the local readers that have an interest in the data */
for (proxy_set_rd = ddsrt_avl_citer_first (&proxy_set_reader_td, &proxy_set->readers, &it); proxy_set_rd; proxy_set_rd = ddsrt_avl_citer_next (&it)) {
dds_entity_t reader = proxy_set_rd->reader;
/* in order to insert the data in the rhc of the reader we first
* need to resolve the type of the reader */
if ((ret = dds_get_entity_sertype (reader, &sertype)) < 0) {
/* We failed to get the sertype. If it happens I do not consider
* this my problem, it is a problem in CycloneDDS.
* For now I silently ignore the data. After all, I cannot
* deliver the data to this reader! */
continue;
}
/* get the serdata */
if ((serdata = ddsi_serdata_from_ser_iov (sertype, serdata_kind, 1, &data_out, serdata_size)) == NULL) {
/* Failed to get the serdata. If it happens I do not consider
* this my problem, it is a problem in CycloneDDS. The only thing
* I can do is to handle this case. For now I silently ignore the data.
*/
goto err_serdata;
}
serdata->sequence_number = seqnum;
serdata->timestamp.v = wt;
memcpy(&serdata->writer_guid, &wguid, 16);
autodispose = false; /* TODO: we have to retrieve the autodispose setting of the writer! */
if ((ret = dds_reader_store_historical_serdata(reader, wguid, autodispose, serdata)) != DDS_RETCODE_OK) {
DDS_ERROR("Failed to deliver historical data to reader \"%s\" [%s]\n", dc_stringify_id(proxy_set_rd->key.guid.v, id_str), dds_strretcode(ret));
goto err_store_historical_serdata;
}
ddsi_serdata_to_ser_unref(serdata, &data_out);
}
return;

err_store_historical_serdata:
ddsi_serdata_to_ser_unref(serdata, &data_out);
err_serdata:
return;
}

static int dc_process_response (dds_entity_t rd, struct dc_t *dc)
Expand Down Expand Up @@ -1520,7 +1620,7 @@ static int dc_process_response (dds_entity_t rd, struct dc_t *dc)
}

/* add the reader tree of readers for this proxy set */
static void dc_register_reader_to_proxy_set (struct dc_t *dc, struct proxy_set_t *proxy_set, dds_guid_t guid, struct dds_rhc *rhc)
static void dc_register_reader_to_proxy_set (struct dc_t *dc, struct proxy_set_t *proxy_set, dds_guid_t guid, dds_entity_t reader, struct dds_rhc *rhc)
{
struct proxy_set_reader_t *proxy_set_rd;
struct proxy_set_reader_key_t key;
Expand All @@ -1530,6 +1630,7 @@ static void dc_register_reader_to_proxy_set (struct dc_t *dc, struct proxy_set_t
if ((proxy_set_rd = ddsrt_avl_clookup (&proxy_set_reader_td, &proxy_set->readers, &key)) == NULL) {
proxy_set_rd = (struct proxy_set_reader_t *)ddsrt_malloc(sizeof(struct proxy_set_reader_t));
proxy_set_rd->key.guid = guid;
proxy_set_rd->reader = reader;
proxy_set_rd->rhc = rhc;
ddsrt_avl_cinsert(&proxy_set_reader_td, &proxy_set->readers, proxy_set_rd);
DDS_CLOG(DDS_LC_DUR, &dc->gv->logconfig, "reader \"%s\" added to proxy set '%s.%s'\n", dc_stringify_id(guid.v, id_str), proxy_set->key.partition, proxy_set->key.tpname);
Expand Down Expand Up @@ -1597,7 +1698,7 @@ static void dc_create_proxy_sets_for_reader (struct dc_t *dc, dds_entity_t reade
* We do this BEFORE we send the request, so that we are sure
* that when we receive the response (possibly immediately after
* sending the request) the reader and rhc are present */
dc_register_reader_to_proxy_set(dc, proxy_set, guid, rhc);
dc_register_reader_to_proxy_set(dc, proxy_set, guid, reader, rhc);
/* send a request for this proxy set */
if ((rc = dc_com_request_write(dc->com, proxy_set->key.partition, proxy_set->key.tpname)) != DDS_RETCODE_OK) {
DDS_ERROR("Failed to publish dc_request for proxy set %s.%s\n", proxy_set->key.partition, proxy_set->key.tpname);
Expand Down

0 comments on commit f5a490e

Please sign in to comment.