Skip to content

Commit

Permalink
udp-notif UPDATE send subscription-started
Browse files Browse the repository at this point in the history
refactor configured subscriptions functions:
csn_send_notif_one to send to one specific receiver.
new csn_build_notification function to create a unyte messages.
Send subscription-started and subscription-terminated.
Increment sent_count on each message.
  • Loading branch information
jeremie6wind committed Sep 11, 2023
1 parent 117379e commit 69f1659
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 36 deletions.
160 changes: 125 additions & 35 deletions src/netconf_subscribed_notifications.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,49 +152,34 @@ sub_ntf_find_next(struct np2srv_sub_ntf *last, int (*sub_ntf_match_cb)(struct np
return NULL;
}

int
csn_send_notif(struct csn_receiver_info *recv_info, uint32_t nc_sub_id,
struct timespec timestamp, struct lyd_node **ly_ntf, int use_ntf)
unyte_message_t *
csn_build_notification(struct timespec timestamp, struct lyd_node **ly_ntf)
{
char *string_notification = NULL;
unyte_message_t *message = NULL;
static uint32_t message_id = 0;
char *string_to_send = NULL;
struct np2srv_sub_ntf *sub;
char *eventtime = NULL;
int rc = SR_ERR_OK;
uint32_t r;

/* find the subscription structure */
ly_time_ts2str(&timestamp, &eventtime);
message = (unyte_message_t *)malloc(sizeof *message);
if (!message) {
EINT;
goto cleanup;
}

lyd_print_mem(&string_to_send, *ly_ntf, LYD_XML, LYD_PRINT_WD_ALL | LY_PRINT_SHRINK);
if (asprintf(&string_notification,
if (asprintf((char **)&message->buffer,
"<notification xmlns:\""NC_NS_NOTIF "\">"
"<eventTime>%s</eventTime>"
"%s"
"</notification>",
eventtime, string_to_send) < 0) {
EINT;
rc = SR_ERR_NO_MEMORY;
goto cleanup;
}

sub = sub_ntf_find(nc_sub_id, 0, 0, 0);
if (!sub) {
EINT;
rc = SR_ERR_INTERNAL;
goto cleanup;
}

message = (unyte_message_t *)malloc(sizeof *message);
if (!message) {
EINT;
rc = SR_ERR_NO_MEMORY;
free(message);
goto cleanup;
}

message->buffer = string_notification;
message->buffer_len = strlen(string_notification);
message->buffer_len = strlen(message->buffer);
/* UDP-notif */
message->version = 0;
message->space = 0;
Expand All @@ -209,26 +194,96 @@ csn_send_notif(struct csn_receiver_info *recv_info, uint32_t nc_sub_id,
message->options = NULL;
message->options_len = 0;

cleanup:
free(eventtime);
free(string_to_send);
return message;
}

int
csn_send_notif(struct csn_receiver_info *recv_info, uint32_t nc_sub_id,
struct timespec timestamp, struct lyd_node **ly_ntf, int use_ntf)
{
unyte_message_t *message = NULL;
struct np2srv_sub_ntf *sub;
int rc = SR_ERR_OK;
uint32_t r;

sub = sub_ntf_find(nc_sub_id, 0, 0, 0);
if (!sub) {
EINT;
rc = SR_ERR_INTERNAL;
goto cleanup;
}

message = csn_build_notification(timestamp, ly_ntf);
if (!message) {
EINT;
rc = SR_ERR_NO_MEMORY;
goto cleanup;
}

/* search transport */
for (r = 0; r < recv_info->count; r++) {
unyte_send(recv_info->receivers[r].udp.sender, message);
ATOMIC_INC_RELAXED(sub->sent_count);
}

free(string_notification);

ATOMIC_INC_RELAXED(sub->sent_count);
free(message->buffer);

cleanup:
free(message);
free(eventtime);
free(string_to_send);
if (use_ntf) {
lyd_free_tree(*ly_ntf);
*ly_ntf = NULL;
}
return rc;
}

static int
csn_send_notif_one(struct csn_receiver *receiver, uint32_t nc_sub_id, const char *type)
{
unyte_message_t *message = NULL;
const struct ly_ctx *ly_ctx;
struct np2srv_sub_ntf *sub;
struct lyd_node *ly_ntf;
char notif_string[128];
int rc = SR_ERR_OK;
char buf[11];

ly_ctx = sr_acquire_context(np2srv.sr_conn);
sr_release_context(np2srv.sr_conn);

sprintf(buf, "%" PRIu32, nc_sub_id);
sprintf(notif_string, "/ietf-subscribed-notifications:subscription-%s/id", type);
lyd_new_path(NULL, ly_ctx, notif_string, buf, 0, &ly_ntf);

sub = sub_ntf_find(nc_sub_id, 0, 0, 0);
if (!sub) {
EINT;
rc = SR_ERR_INTERNAL;
goto cleanup;
}

message = csn_build_notification(np_gettimespec(1), &ly_ntf);
if (!message) {
EINT;
rc = SR_ERR_NO_MEMORY;
goto cleanup;
}

unyte_send(receiver->udp.sender, message);

free(message->buffer);

ATOMIC_INC_RELAXED(sub->sent_count);

cleanup:
free(message);
lyd_free_tree(ly_ntf);
return rc;
}

int
sub_ntf_send_notif(struct nc_session *ncs, uint32_t nc_sub_id, struct timespec timestamp, struct lyd_node **ly_ntf,
int use_ntf)
Expand Down Expand Up @@ -574,7 +629,7 @@ csn_receiver_get_by_name(struct csn_receiver_info *recv_info, const char *name)

int
csn_config_sub_receivers_prepare(const struct lyd_node *node_receiver,
struct csn_receiver_info *recv_info)
struct csn_receiver_info *recv_info, uint32_t nc_sub_id)
{
struct csn_receiver_config *recv_config = NULL;
struct csn_receiver *receiver_search = NULL;
Expand Down Expand Up @@ -627,6 +682,10 @@ csn_config_sub_receivers_prepare(const struct lyd_node *node_receiver,
goto cleanup;
}

if (csn_send_notif_one(&receiver, nc_sub_id, "started")) {
WRN("Could not send notification <subscription-started>.");
}

rc = csn_receiver_add(recv_info, &receiver);
if (rc) {
rc = SR_ERR_NO_MEMORY;
Expand Down Expand Up @@ -702,15 +761,25 @@ csn_modify_sub_receiver(const struct lyd_node *node_receiver)
}

if (strcmp(lyd_get_value(node_receiver_ref), receiver->instance_ref)) {

free(receiver->instance_ref);
receiver->instance_ref = strdup(lyd_get_value(node_receiver_ref));

if (csn_send_notif_one(receiver, nc_sub_id, "terminated")) {
WRN("Could not send notification <subscription-terminated>.");
}

csn_receiver_destroy(receiver, 1);

rc = csn_receiver_start(receiver, recv_config, recv_info);
if (rc) {
ERR("Cannot init receiver");
goto cleanup;
}

if (csn_send_notif_one(receiver, nc_sub_id, "started")) {
WRN("Could not send notification <subscription-started>.");
}
}

cleanup:
Expand Down Expand Up @@ -753,7 +822,7 @@ csn_config_sub_receiver(const struct lyd_node *node_receiver)
goto error;
}

rc = csn_config_sub_receivers_prepare(node_receiver, recv_info);
rc = csn_config_sub_receivers_prepare(node_receiver, recv_info, nc_sub_id);

error:
return rc;
Expand Down Expand Up @@ -791,15 +860,20 @@ csn_receiver_destroy(struct csn_receiver *receiver, int keep_ref)
}

int
csn_receiver_remove_by_name(struct csn_receiver_info *recv_info, const char *name)
csn_receiver_remove_by_name(struct csn_receiver_info *recv_info, const char *name, uint32_t nc_sub_id)
{
uint32_t r;

for (r = 0; r < recv_info->count; r++) {

if (strcmp(name, recv_info->receivers[r].name)) {
continue;
}

if (csn_send_notif_one(&recv_info->receivers[r], nc_sub_id, "terminated")) {
WRN("Could not send notification <subscription-terminated>.");
}

csn_receiver_destroy(&recv_info->receivers[r], 0);

recv_info->count--;
Expand Down Expand Up @@ -861,7 +935,7 @@ csn_delete_sub_receiver(const struct lyd_node *node_receiver)
goto error;
}

rc = csn_receiver_remove_by_name(recv_info, receiver_name);
rc = csn_receiver_remove_by_name(recv_info, receiver_name, nc_sub_id);

error:
return rc;
Expand Down Expand Up @@ -989,17 +1063,25 @@ csn_receivers_restart(struct csn_receiver_config *recv_config)
/* restart receivers of this subscription if they match the name */
for (r = 0; r < recv_info->count; r++) {
struct csn_receiver *receiver = &recv_info->receivers[r];

if (strcmp(receiver->instance_ref, recv_config->instance_name)) {
continue;
}

if (csn_send_notif_one(receiver, info.subs[s].nc_sub_id, "terminated")) {
WRN("Could not send notification <subscription-terminated>.");
}

csn_receiver_destroy(receiver, 1);
rc = csn_receiver_start(receiver, recv_config, recv_info);
if (rc) {
ERR("Cannot init receiver");
goto cleanup;
}

if (csn_send_notif_one(receiver, info.subs[s].nc_sub_id, "started")) {
WRN("Could not send notification <subscription-started>.");
}
}
}

Expand Down Expand Up @@ -2323,12 +2405,20 @@ np2srv_rpc_reset_receiver_cb(sr_session_ctx_t *UNUSED(session), uint32_t UNUSED(
goto cleanup;
}

if (csn_send_notif_one(receiver, nc_sub_id, "terminated")) {
WRN("Could not send notification <subscription-terminated>.");
}

if (csn_receiver_reset(receiver)) {
rc = SR_ERR_INVAL_ARG;
ERR("Receiver could not be reset");
goto cleanup;
}

if (csn_send_notif_one(receiver, nc_sub_id, "started")) {
WRN("Could not send notification <subscription-started>.");
}

if (output) {
ly_time_ts2str(&receiver->reset_time, &time_str);
if (lyd_new_term(output, NULL, "time", time_str, 1, NULL)) {
Expand Down
2 changes: 1 addition & 1 deletion src/netconf_subscribed_notifications.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ int np2srv_rpc_reset_receiver_cb(sr_session_ctx_t *session, uint32_t sub_id, con
*/

/**
* @brief Send a notification.
* @brief Send a notification to all receivers of this subscription.
*
* @param[in] receivers reference to receivers to use.
* @param[in] receivers_count the number of receivers.
Expand Down

0 comments on commit 69f1659

Please sign in to comment.