From 69f16596779dcb67cd0f97a9fabb8675f559bdda Mon Sep 17 00:00:00 2001 From: Jeremie Leska Date: Wed, 23 Aug 2023 14:24:09 +0200 Subject: [PATCH] udp-notif UPDATE send subscription-started refactor configured subscriptions functions: csn_send_notif_one to send to one specific receiver. new csn_build_notification function to create a unyte messages. Send subscription-started and subscription-terminated. Increment sent_count on each message. --- src/netconf_subscribed_notifications.c | 160 +++++++++++++++++++------ src/netconf_subscribed_notifications.h | 2 +- 2 files changed, 126 insertions(+), 36 deletions(-) diff --git a/src/netconf_subscribed_notifications.c b/src/netconf_subscribed_notifications.c index 3ec08e05..103de68f 100644 --- a/src/netconf_subscribed_notifications.c +++ b/src/netconf_subscribed_notifications.c @@ -152,49 +152,34 @@ sub_ntf_find_next(struct np2srv_sub_ntf *last, int (*sub_ntf_match_cb)(struct np return NULL; } -int -csn_send_notif(struct csn_receiver_info *recv_info, uint32_t nc_sub_id, - struct timespec timestamp, struct lyd_node **ly_ntf, int use_ntf) +unyte_message_t * +csn_build_notification(struct timespec timestamp, struct lyd_node **ly_ntf) { - char *string_notification = NULL; unyte_message_t *message = NULL; static uint32_t message_id = 0; char *string_to_send = NULL; - struct np2srv_sub_ntf *sub; char *eventtime = NULL; - int rc = SR_ERR_OK; - uint32_t r; - /* find the subscription structure */ ly_time_ts2str(×tamp, &eventtime); + message = (unyte_message_t *)malloc(sizeof *message); + if (!message) { + EINT; + goto cleanup; + } + lyd_print_mem(&string_to_send, *ly_ntf, LYD_XML, LYD_PRINT_WD_ALL | LY_PRINT_SHRINK); - if (asprintf(&string_notification, + if (asprintf((char **)&message->buffer, "" "%s" "%s" "", eventtime, string_to_send) < 0) { EINT; - rc = SR_ERR_NO_MEMORY; - goto cleanup; - } - - sub = sub_ntf_find(nc_sub_id, 0, 0, 0); - if (!sub) { - EINT; - rc = SR_ERR_INTERNAL; - goto cleanup; - } - - message = (unyte_message_t *)malloc(sizeof *message); - if (!message) { - EINT; - rc = SR_ERR_NO_MEMORY; + free(message); goto cleanup; } - message->buffer = string_notification; - message->buffer_len = strlen(string_notification); + message->buffer_len = strlen(message->buffer); /* UDP-notif */ message->version = 0; message->space = 0; @@ -209,19 +194,45 @@ csn_send_notif(struct csn_receiver_info *recv_info, uint32_t nc_sub_id, message->options = NULL; message->options_len = 0; +cleanup: + free(eventtime); + free(string_to_send); + return message; +} + +int +csn_send_notif(struct csn_receiver_info *recv_info, uint32_t nc_sub_id, + struct timespec timestamp, struct lyd_node **ly_ntf, int use_ntf) +{ + unyte_message_t *message = NULL; + struct np2srv_sub_ntf *sub; + int rc = SR_ERR_OK; + uint32_t r; + + sub = sub_ntf_find(nc_sub_id, 0, 0, 0); + if (!sub) { + EINT; + rc = SR_ERR_INTERNAL; + goto cleanup; + } + + message = csn_build_notification(timestamp, ly_ntf); + if (!message) { + EINT; + rc = SR_ERR_NO_MEMORY; + goto cleanup; + } + /* search transport */ for (r = 0; r < recv_info->count; r++) { unyte_send(recv_info->receivers[r].udp.sender, message); + ATOMIC_INC_RELAXED(sub->sent_count); } - free(string_notification); - - ATOMIC_INC_RELAXED(sub->sent_count); + free(message->buffer); cleanup: free(message); - free(eventtime); - free(string_to_send); if (use_ntf) { lyd_free_tree(*ly_ntf); *ly_ntf = NULL; @@ -229,6 +240,50 @@ csn_send_notif(struct csn_receiver_info *recv_info, uint32_t nc_sub_id, return rc; } +static int +csn_send_notif_one(struct csn_receiver *receiver, uint32_t nc_sub_id, const char *type) +{ + unyte_message_t *message = NULL; + const struct ly_ctx *ly_ctx; + struct np2srv_sub_ntf *sub; + struct lyd_node *ly_ntf; + char notif_string[128]; + int rc = SR_ERR_OK; + char buf[11]; + + ly_ctx = sr_acquire_context(np2srv.sr_conn); + sr_release_context(np2srv.sr_conn); + + sprintf(buf, "%" PRIu32, nc_sub_id); + sprintf(notif_string, "/ietf-subscribed-notifications:subscription-%s/id", type); + lyd_new_path(NULL, ly_ctx, notif_string, buf, 0, &ly_ntf); + + sub = sub_ntf_find(nc_sub_id, 0, 0, 0); + if (!sub) { + EINT; + rc = SR_ERR_INTERNAL; + goto cleanup; + } + + message = csn_build_notification(np_gettimespec(1), &ly_ntf); + if (!message) { + EINT; + rc = SR_ERR_NO_MEMORY; + goto cleanup; + } + + unyte_send(receiver->udp.sender, message); + + free(message->buffer); + + ATOMIC_INC_RELAXED(sub->sent_count); + +cleanup: + free(message); + lyd_free_tree(ly_ntf); + return rc; +} + int sub_ntf_send_notif(struct nc_session *ncs, uint32_t nc_sub_id, struct timespec timestamp, struct lyd_node **ly_ntf, int use_ntf) @@ -574,7 +629,7 @@ csn_receiver_get_by_name(struct csn_receiver_info *recv_info, const char *name) int csn_config_sub_receivers_prepare(const struct lyd_node *node_receiver, - struct csn_receiver_info *recv_info) + struct csn_receiver_info *recv_info, uint32_t nc_sub_id) { struct csn_receiver_config *recv_config = NULL; struct csn_receiver *receiver_search = NULL; @@ -627,6 +682,10 @@ csn_config_sub_receivers_prepare(const struct lyd_node *node_receiver, goto cleanup; } + if (csn_send_notif_one(&receiver, nc_sub_id, "started")) { + WRN("Could not send notification ."); + } + rc = csn_receiver_add(recv_info, &receiver); if (rc) { rc = SR_ERR_NO_MEMORY; @@ -702,15 +761,25 @@ csn_modify_sub_receiver(const struct lyd_node *node_receiver) } if (strcmp(lyd_get_value(node_receiver_ref), receiver->instance_ref)) { + free(receiver->instance_ref); receiver->instance_ref = strdup(lyd_get_value(node_receiver_ref)); + if (csn_send_notif_one(receiver, nc_sub_id, "terminated")) { + WRN("Could not send notification ."); + } + csn_receiver_destroy(receiver, 1); + rc = csn_receiver_start(receiver, recv_config, recv_info); if (rc) { ERR("Cannot init receiver"); goto cleanup; } + + if (csn_send_notif_one(receiver, nc_sub_id, "started")) { + WRN("Could not send notification ."); + } } cleanup: @@ -753,7 +822,7 @@ csn_config_sub_receiver(const struct lyd_node *node_receiver) goto error; } - rc = csn_config_sub_receivers_prepare(node_receiver, recv_info); + rc = csn_config_sub_receivers_prepare(node_receiver, recv_info, nc_sub_id); error: return rc; @@ -791,15 +860,20 @@ csn_receiver_destroy(struct csn_receiver *receiver, int keep_ref) } int -csn_receiver_remove_by_name(struct csn_receiver_info *recv_info, const char *name) +csn_receiver_remove_by_name(struct csn_receiver_info *recv_info, const char *name, uint32_t nc_sub_id) { uint32_t r; for (r = 0; r < recv_info->count; r++) { + if (strcmp(name, recv_info->receivers[r].name)) { continue; } + if (csn_send_notif_one(&recv_info->receivers[r], nc_sub_id, "terminated")) { + WRN("Could not send notification ."); + } + csn_receiver_destroy(&recv_info->receivers[r], 0); recv_info->count--; @@ -861,7 +935,7 @@ csn_delete_sub_receiver(const struct lyd_node *node_receiver) goto error; } - rc = csn_receiver_remove_by_name(recv_info, receiver_name); + rc = csn_receiver_remove_by_name(recv_info, receiver_name, nc_sub_id); error: return rc; @@ -989,10 +1063,15 @@ csn_receivers_restart(struct csn_receiver_config *recv_config) /* restart receivers of this subscription if they match the name */ for (r = 0; r < recv_info->count; r++) { struct csn_receiver *receiver = &recv_info->receivers[r]; + if (strcmp(receiver->instance_ref, recv_config->instance_name)) { continue; } + if (csn_send_notif_one(receiver, info.subs[s].nc_sub_id, "terminated")) { + WRN("Could not send notification ."); + } + csn_receiver_destroy(receiver, 1); rc = csn_receiver_start(receiver, recv_config, recv_info); if (rc) { @@ -1000,6 +1079,9 @@ csn_receivers_restart(struct csn_receiver_config *recv_config) goto cleanup; } + if (csn_send_notif_one(receiver, info.subs[s].nc_sub_id, "started")) { + WRN("Could not send notification ."); + } } } @@ -2323,12 +2405,20 @@ np2srv_rpc_reset_receiver_cb(sr_session_ctx_t *UNUSED(session), uint32_t UNUSED( goto cleanup; } + if (csn_send_notif_one(receiver, nc_sub_id, "terminated")) { + WRN("Could not send notification ."); + } + if (csn_receiver_reset(receiver)) { rc = SR_ERR_INVAL_ARG; ERR("Receiver could not be reset"); goto cleanup; } + if (csn_send_notif_one(receiver, nc_sub_id, "started")) { + WRN("Could not send notification ."); + } + if (output) { ly_time_ts2str(&receiver->reset_time, &time_str); if (lyd_new_term(output, NULL, "time", time_str, 1, NULL)) { diff --git a/src/netconf_subscribed_notifications.h b/src/netconf_subscribed_notifications.h index 06f0f4e6..f947f881 100644 --- a/src/netconf_subscribed_notifications.h +++ b/src/netconf_subscribed_notifications.h @@ -212,7 +212,7 @@ int np2srv_rpc_reset_receiver_cb(sr_session_ctx_t *session, uint32_t sub_id, con */ /** - * @brief Send a notification. + * @brief Send a notification to all receivers of this subscription. * * @param[in] receivers reference to receivers to use. * @param[in] receivers_count the number of receivers.