diff --git a/src/durability/src/dds_durability.c b/src/durability/src/dds_durability.c index 5cfad75937..1dc1617c18 100644 --- a/src/durability/src/dds_durability.c +++ b/src/durability/src/dds_durability.c @@ -22,6 +22,7 @@ #include "dds__writer.h" #include "dds/ddsi/ddsi_endpoint.h" #include "dds/ddsrt/avl.h" +#include "dds/ddsi/ddsi_serdata.h" #define DEFAULT_QOURUM 1 #define DEFAULT_IDENT "durable_support" @@ -236,6 +237,7 @@ static int cmp_proxy_set_reader (const void *a, const void *b) struct proxy_set_reader_t { ddsrt_avl_node_t node; struct proxy_set_reader_key_t key; + dds_entity_t reader; struct dds_rhc *rhc; }; @@ -1444,9 +1446,57 @@ static void dc_process_set_response (struct dc_t *dc, DurableSupport_response *r } } +#if 0 +int dc_deliver_blob () +{ +} +#endif + +/* todo: this function is hared between ds and client */ +static enum ddsi_serdata_kind get_serdata_kind (uint8_t kind) +{ + enum ddsi_serdata_kind result = SDK_EMPTY; + + if (kind == 0) { + result = SDK_EMPTY; + } else if (kind == 1) { + result = SDK_KEY; + } else if (kind == 2) { + result = SDK_DATA; + } else { + DDS_ERROR("Invalid serdata kind %d", kind); + abort(); + } + return result; +} + +/* These are the offsets used in the data responses. + * TODO: These definitions should actually be shared between the ds and the client. + */ +#define RESPONSE_HEADER_OFFSET_WT 8 +#define RESPONSE_HEADER_OFFSET_SEQNUM 16 +#define RESPONSE_HEADER_OFFSET_WRITER_GUID 24 +#define RESPONSE_HEADER_OFFSET_SERDATA_KIND 40 +#define RESPONSE_HEADER_OFFSET_SERDATA 41 + + static void dc_process_data_response (struct dc_t *dc, DurableSupport_response *response) { struct proxy_set_t *proxy_set = NULL; + ddsrt_avl_citer_t it; + const struct ddsi_sertype *sertype; + struct ddsi_serdata *serdata; + uint32_t response_size, serdata_size;; + uint32_t serdata_offset; + ddsrt_iovec_t data_out; + int64_t wt; + uint64_t seqnum; + enum ddsi_serdata_kind serdata_kind; + dds_guid_t wguid; + dds_return_t ret = DDS_RETCODE_OK; + bool autodispose; + struct proxy_set_reader_t *proxy_set_rd; + char id_str[37]; /* TODO: * A DS also has a response reader (by virtue of the CycloneDDS instance it runs). @@ -1468,7 +1518,57 @@ static void dc_process_data_response (struct dc_t *dc, DurableSupport_response * /* A response_set begin has been received. Because data delivery is reliable, * we are sure that we missed no responses, so the data response that we received * must belong to the proxy set. */ -// printf("LH *** inject data from proxy set \"%s.%s\"\n", proxy_set->key.partition, proxy_set->key.tpname); + response_size = response->body._u.data.blob._length; + serdata_offset = ddsrt_fromBE4u(*((uint32_t *)response->body._u.data.blob._buffer)); + wt = ddsrt_fromBE8(*((int64_t *)(response->body._u.data.blob._buffer + RESPONSE_HEADER_OFFSET_WT))); + seqnum = ddsrt_fromBE8u(*((uint64_t *)(response->body._u.data.blob._buffer + RESPONSE_HEADER_OFFSET_SEQNUM))); + memcpy(&wguid.v, response->body._u.data.blob._buffer + RESPONSE_HEADER_OFFSET_WRITER_GUID, 16); + serdata_kind = get_serdata_kind(*((uint8_t *)response->body._u.data.blob._buffer + RESPONSE_HEADER_OFFSET_SERDATA_KIND)); + /* We could now potentially figure out if the response that has been received + * contains fields that we cannot interpret. We can find that out by comparing + * the received response_offset with my own RESPONSE_HEADER_OFFSET_SERDATA. + * If serdata_offset > RESPONSE_HEADER_OFFSET_SERDATA then this is an indication + * that the ds has more fields than this client can interpret. */ + /* get the serdata */ + serdata_size = response_size - serdata_offset; + data_out.iov_len = serdata_size; + data_out.iov_base = response->body._u.data.blob._buffer + serdata_offset; + /* now deliver the data to the local readers that have an interest in the data */ + for (proxy_set_rd = ddsrt_avl_citer_first (&proxy_set_reader_td, &proxy_set->readers, &it); proxy_set_rd; proxy_set_rd = ddsrt_avl_citer_next (&it)) { + dds_entity_t reader = proxy_set_rd->reader; + /* in order to insert the data in the rhc of the reader we first + * need to resolve the type of the reader */ + if ((ret = dds_get_entity_sertype (reader, &sertype)) < 0) { + /* We failed to get the sertype. If it happens I do not consider + * this my problem, it is a problem in CycloneDDS. + * For now I silently ignore the data. After all, I cannot + * deliver the data to this reader! */ + continue; + } + /* get the serdata */ + if ((serdata = ddsi_serdata_from_ser_iov (sertype, serdata_kind, 1, &data_out, serdata_size)) == NULL) { + /* Failed to get the serdata. If it happens I do not consider + * this my problem, it is a problem in CycloneDDS. The only thing + * I can do is to handle this case. For now I silently ignore the data. + */ + goto err_serdata; + } + serdata->sequence_number = seqnum; + serdata->timestamp.v = wt; + memcpy(&serdata->writer_guid, &wguid, 16); + autodispose = false; /* TODO: we have to retrieve the autodispose setting of the writer! */ + if ((ret = dds_reader_store_historical_serdata(reader, wguid, autodispose, serdata)) != DDS_RETCODE_OK) { + DDS_ERROR("Failed to deliver historical data to reader \"%s\" [%s]\n", dc_stringify_id(proxy_set_rd->key.guid.v, id_str), dds_strretcode(ret)); + goto err_store_historical_serdata; + } + ddsi_serdata_to_ser_unref(serdata, &data_out); + } + return; + +err_store_historical_serdata: + ddsi_serdata_to_ser_unref(serdata, &data_out); +err_serdata: + return; } static int dc_process_response (dds_entity_t rd, struct dc_t *dc) @@ -1520,7 +1620,7 @@ static int dc_process_response (dds_entity_t rd, struct dc_t *dc) } /* add the reader tree of readers for this proxy set */ -static void dc_register_reader_to_proxy_set (struct dc_t *dc, struct proxy_set_t *proxy_set, dds_guid_t guid, struct dds_rhc *rhc) +static void dc_register_reader_to_proxy_set (struct dc_t *dc, struct proxy_set_t *proxy_set, dds_guid_t guid, dds_entity_t reader, struct dds_rhc *rhc) { struct proxy_set_reader_t *proxy_set_rd; struct proxy_set_reader_key_t key; @@ -1530,6 +1630,7 @@ static void dc_register_reader_to_proxy_set (struct dc_t *dc, struct proxy_set_t if ((proxy_set_rd = ddsrt_avl_clookup (&proxy_set_reader_td, &proxy_set->readers, &key)) == NULL) { proxy_set_rd = (struct proxy_set_reader_t *)ddsrt_malloc(sizeof(struct proxy_set_reader_t)); proxy_set_rd->key.guid = guid; + proxy_set_rd->reader = reader; proxy_set_rd->rhc = rhc; ddsrt_avl_cinsert(&proxy_set_reader_td, &proxy_set->readers, proxy_set_rd); DDS_CLOG(DDS_LC_DUR, &dc->gv->logconfig, "reader \"%s\" added to proxy set '%s.%s'\n", dc_stringify_id(guid.v, id_str), proxy_set->key.partition, proxy_set->key.tpname); @@ -1597,7 +1698,7 @@ static void dc_create_proxy_sets_for_reader (struct dc_t *dc, dds_entity_t reade * We do this BEFORE we send the request, so that we are sure * that when we receive the response (possibly immediately after * sending the request) the reader and rhc are present */ - dc_register_reader_to_proxy_set(dc, proxy_set, guid, rhc); + dc_register_reader_to_proxy_set(dc, proxy_set, guid, reader, rhc); /* send a request for this proxy set */ if ((rc = dc_com_request_write(dc->com, proxy_set->key.partition, proxy_set->key.tpname)) != DDS_RETCODE_OK) { DDS_ERROR("Failed to publish dc_request for proxy set %s.%s\n", proxy_set->key.partition, proxy_set->key.tpname);