From b83e4a0a89380480ae92ba216206b8c8d3b995e8 Mon Sep 17 00:00:00 2001 From: Jeremie Leska Date: Thu, 7 Sep 2023 13:47:08 +0200 Subject: [PATCH] udp-notif UPDATE fix configured subscriptions Add missing return code. Add missing function description. Rename some functions. --- src/main.c | 15 +- src/netconf_subscribed_notifications.c | 710 +++++++++++++++---------- src/netconf_subscribed_notifications.h | 34 -- src/subscribed_notifications.c | 1 + src/yang_push.c | 15 +- 5 files changed, 463 insertions(+), 312 deletions(-) diff --git a/src/main.c b/src/main.c index c5db4a5b..91fb4d6b 100644 --- a/src/main.c +++ b/src/main.c @@ -860,15 +860,24 @@ server_data_subscribe(void) SR_CONFIG_SUBSCR(mod_name, "/ietf-subscribed-notifications:subscriptions/receiver-instances/receiver-instance", np2srv_config_receivers_cb); - SR_CONFIG_SUBSCR(mod_name, "/ietf-subscribed-notifications:subscriptions/subscription", - np2srv_config_subscriptions_cb); + rc = sr_module_change_subscribe(np2srv.sr_sess, mod_name, "/ietf-subscribed-notifications:subscriptions/subscription", + np2srv_config_subscriptions_cb, NULL, 0, SR_SUBSCR_ENABLED, &np2srv.sr_data_sub); + if (rc != SR_ERR_OK) { + ERR("Subscribing for \"%s\" data changes failed (%s).", mod_name, sr_strerror(rc)); + goto error; + } SR_CONFIG_SUBSCR(mod_name, "/ietf-subscribed-notifications:subscriptions/subscription/receivers/receiver", np2srv_config_subscriptions_receivers_cb); /* operational data */ SR_OPER_SUBSCR(mod_name, "/ietf-subscribed-notifications:streams", np2srv_oper_sub_ntf_streams_cb); - SR_OPER_SUBSCR(mod_name, "/ietf-subscribed-notifications:subscriptions", np2srv_oper_sub_ntf_subscriptions_cb); + rc = sr_oper_get_subscribe(np2srv.sr_sess, mod_name, "/ietf-subscribed-notifications:subscriptions", + np2srv_oper_sub_ntf_subscriptions_cb, NULL, SR_SUBSCR_OPER_MERGE, &np2srv.sr_data_sub); + if (rc != SR_ERR_OK) { + ERR("Subscribing for providing \"%s\" state data failed (%s).", mod_name, sr_strerror(rc)); + goto error; + } /* * ietf-netconf-server diff --git a/src/netconf_subscribed_notifications.c b/src/netconf_subscribed_notifications.c index 103de68f..dd20bdfa 100644 --- a/src/netconf_subscribed_notifications.c +++ b/src/netconf_subscribed_notifications.c @@ -19,6 +19,7 @@ #include "netconf_subscribed_notifications.h" #include +#include #include #include #include @@ -152,34 +153,42 @@ sub_ntf_find_next(struct np2srv_sub_ntf *last, int (*sub_ntf_match_cb)(struct np return NULL; } -unyte_message_t * +/** + * @brief Build a notification. + * + * @param[in] timestamp any timestamp, mainly the current time. + * @param[in] ly_ntf the notification content to add to the message. + * @return a pointer to a new unyte message on success, NULL on failure. + */ +static unyte_message_t * csn_build_notification(struct timespec timestamp, struct lyd_node **ly_ntf) { unyte_message_t *message = NULL; static uint32_t message_id = 0; char *string_to_send = NULL; char *eventtime = NULL; + int rc = 0; ly_time_ts2str(×tamp, &eventtime); message = (unyte_message_t *)malloc(sizeof *message); if (!message) { - EINT; + EMEM; goto cleanup; } lyd_print_mem(&string_to_send, *ly_ntf, LYD_XML, LYD_PRINT_WD_ALL | LY_PRINT_SHRINK); - if (asprintf((char **)&message->buffer, + if ((rc = asprintf((char **)&message->buffer, "" "%s" "%s" "", - eventtime, string_to_send) < 0) { - EINT; - free(message); + eventtime, string_to_send)) < 0) { + EMEM; goto cleanup; } - message->buffer_len = strlen(message->buffer); + message->buffer_len = rc; + /* UDP-notif */ message->version = 0; message->space = 0; @@ -195,6 +204,11 @@ csn_build_notification(struct timespec timestamp, struct lyd_node **ly_ntf) message->options_len = 0; cleanup: + if (rc < 0) { + free(message); + message = NULL; + } + free(eventtime); free(string_to_send); return message; @@ -218,7 +232,7 @@ csn_send_notif(struct csn_receiver_info *recv_info, uint32_t nc_sub_id, message = csn_build_notification(timestamp, ly_ntf); if (!message) { - EINT; + EMEM; rc = SR_ERR_NO_MEMORY; goto cleanup; } @@ -229,10 +243,12 @@ csn_send_notif(struct csn_receiver_info *recv_info, uint32_t nc_sub_id, ATOMIC_INC_RELAXED(sub->sent_count); } - free(message->buffer); - cleanup: - free(message); + if (message) { + free(message->buffer); + free(message); + } + if (use_ntf) { lyd_free_tree(*ly_ntf); *ly_ntf = NULL; @@ -240,6 +256,14 @@ csn_send_notif(struct csn_receiver_info *recv_info, uint32_t nc_sub_id, return rc; } +/** + * @brief Send a notification to a receiver. + * + * @param[in] receiver the receiver to send the notif. + * @param[in] nc_sub_id the configured subscription id. + * @param[in] type may be started or terminated. + * @return Sysrepo error value. + */ static int csn_send_notif_one(struct csn_receiver *receiver, uint32_t nc_sub_id, const char *type) { @@ -267,19 +291,21 @@ csn_send_notif_one(struct csn_receiver *receiver, uint32_t nc_sub_id, const char message = csn_build_notification(np_gettimespec(1), &ly_ntf); if (!message) { - EINT; + EMEM; rc = SR_ERR_NO_MEMORY; goto cleanup; } unyte_send(receiver->udp.sender, message); - free(message->buffer); - ATOMIC_INC_RELAXED(sub->sent_count); cleanup: - free(message); + if (message) { + free(message->buffer); + free(message); + } + lyd_free_tree(ly_ntf); return rc; } @@ -352,7 +378,7 @@ sub_ntf_add(const struct np2srv_sub_ntf *sub) } /** - * @brief add receiver config to the list of receivers + * @brief Add receiver config to the list of receivers. * * @param[in] recv_config receiver config to add to the list. * @return 0 on success. @@ -456,9 +482,18 @@ np2srv_sub_ntf_destroy(void) INFO_UNLOCK; } +/** + * @brief Establish a new subscription either configured or not. + * + * @param[in] session sysrepo session. + * @param[in] input the data containing the subscription info. + * @param[out] output to return to the netconf subscriber. + * @param[in] ncs the NETCONF session to use. + * @return Sysrepo error value. + */ static int -np2srv_establish_sub_cb(sr_session_ctx_t *session, const struct lyd_node *input, - struct lyd_node *output, struct nc_session *ncs) +np2srv_establish_sub(sr_session_ctx_t *session, const struct lyd_node *input, struct lyd_node *output, + struct nc_session *ncs) { struct lyd_node *node; struct np2srv_sub_ntf sub = {0}, *sub_p; @@ -516,7 +551,7 @@ np2srv_establish_sub_cb(sr_session_ctx_t *session, const struct lyd_node *input, if (lyd_find_path(input, "id", 0, &node)) { ERR("id not found."); - rc = SR_ERR_INTERNAL; + rc = SR_ERR_INVAL_ARG; goto error; } @@ -613,7 +648,14 @@ np2srv_establish_sub_cb(sr_session_ctx_t *session, const struct lyd_node *input, return rc; } -struct csn_receiver * +/** + * @brief Return the receiver info found with its name. + * + * @param[in] recv_info the receiver info of a subscription. + * @param[in] name the name of the receiver. + * @return the found receiver info or NULL. + */ +static struct csn_receiver * csn_receiver_get_by_name(struct csn_receiver_info *recv_info, const char *name) { uint32_t r; @@ -627,8 +669,160 @@ csn_receiver_get_by_name(struct csn_receiver_info *recv_info, const char *name) return NULL; } -int -csn_config_sub_receivers_prepare(const struct lyd_node *node_receiver, +/** + * @brief Destroy content of receiver in a subscription + * + * @param[in] receiver in the subscription in the receiver_info. + * @param[in] keep_ref 0: only the udp connection is destroyed, for restart, 1: everything is deleted. + */ +static void +csn_receiver_destroy(struct csn_receiver *receiver, int keep_ref) +{ + if (!receiver) { + return; + } + + if (!keep_ref) { + free(receiver->name); + receiver->name = NULL; + + free(receiver->instance_ref); + receiver->instance_ref = NULL; + } + + free_sender_socket(receiver->udp.sender); + receiver->udp.sender = NULL; + + free(receiver->udp.options.address); + receiver->udp.options.address = NULL; + + free(receiver->udp.options.port); + receiver->udp.options.port = NULL; + + free(receiver->udp.options.interface); + receiver->udp.options.interface = NULL; + + free(receiver->udp.options.local_address); + receiver->udp.options.local_address = NULL; +} + +/** + * @brief Start a receiver to send udp notification + * + * @param[in,out] receiver the receiver to configure and start. + * @param[in] recv_config the configuration of this receiver. + * @param[in] recv_info, the receiver info structure in the subscription. + * @return 0 on success, -1 on error. + */ +static int +csn_receiver_start(struct csn_receiver *receiver, struct csn_receiver_config *recv_config, + struct csn_receiver_info *recv_info) +{ + receiver->state = CSN_RECEIVER_STATE_CONNECTING; + receiver->reset_time = np_gettimespec(1); + + receiver->udp.options.default_mtu = 1500; + receiver->udp.options.address = strdup(recv_config->udp.address); + if (!receiver->udp.options.address) { + EMEM; + goto error; + } + + receiver->udp.options.port = strdup(recv_config->udp.port); + if (!receiver->udp.options.port) { + EMEM; + goto error; + } + + if (recv_info->local_address) { + receiver->udp.options.local_address = strdup(recv_info->local_address); + if (!receiver->udp.options.local_address) { + EMEM; + goto error; + } + } + + if (recv_info->interface) { + receiver->udp.options.interface = strdup(recv_info->interface); + if (!receiver->udp.options.interface) { + EMEM; + goto error; + } + } + + receiver->udp.sender = unyte_start_sender(&receiver->udp.options); + if (!receiver->udp.sender) { + ERR("Cannot create udp sender: (%s).", strerror(errno)); + goto error; + } + + receiver->state = CSN_RECEIVER_STATE_ACTIVE; + + return 0; + +error: + free(receiver->udp.options.address); + free(receiver->udp.options.port); + free(receiver->udp.options.interface); + free(receiver->udp.options.local_address); + return -1; +} + +/** + * @brief add a receiver in the receiver_info list + * + * @param[in] receiver in the subscription in the receiver_info. + * @param[in] receiver_info in the subscription. + * @return 0 on success, -1 on error. + */ +static int +csn_receiver_add(struct csn_receiver_info *recv_info, struct csn_receiver *receiver) +{ + void *mem; + + mem = realloc(recv_info->receivers, (recv_info->count + 1) * sizeof *receiver); + if (!mem) { + return -1; + } + + recv_info->receivers = mem; + + recv_info->receivers[recv_info->count] = *receiver; + ++recv_info->count; + + return 0; +} + +/** + * @brief Look for a receiver config according to its name. + * + * @param[in] name the name of the receiver config. + * @return the found receiver config or NULL. + */ +static struct csn_receiver_config * +csn_receiver_config_get_by_name(const char *name) +{ + uint32_t r; + + for (r = 0; r < info.recv_cfg_count; r++) { + if (!strcmp(name, info.recv_configs[r].instance_name)) { + return &info.recv_configs[r]; + } + } + + return NULL; +} + +/** + * @brief Add a receiver to a subscription. + * + * @param[in] node_receiver the configuration of the receiver. + * @param[in] recv_info the receivers info of this subscription. + * @param[in] nc_sub_id the subscription id of this configured subscription. + * @return Sysrepo error value. + */ +static int +csn_add_sub_receivers_prepare(const struct lyd_node *node_receiver, struct csn_receiver_info *recv_info, uint32_t nc_sub_id) { struct csn_receiver_config *recv_config = NULL; @@ -639,46 +833,48 @@ csn_config_sub_receivers_prepare(const struct lyd_node *node_receiver, int rc = SR_ERR_OK; if (lyd_find_path(node_receiver, "name", 0, &node_receiver_name)) { - ERR("Missing receiver name"); + ERR("Missing receiver name."); + rc = SR_ERR_INVAL_ARG; goto cleanup; } if (lyd_find_path(node_receiver, "ietf-subscribed-notif-receivers:receiver-instance-ref", 0, &node_receiver_ref)) { - ERR("Missing receiver instance ref"); + ERR("Missing receiver instance ref."); + rc = SR_ERR_INVAL_ARG; goto cleanup; } receiver_search = csn_receiver_get_by_name(recv_info, lyd_get_value(node_receiver_name)); if (receiver_search) { - WRN("Receiver already existing"); + WRN("Receiver already existing."); goto cleanup; } receiver.name = strdup(lyd_get_value(node_receiver_name)); if (!receiver.name) { + EMEM; rc = SR_ERR_NO_MEMORY; - csn_receiver_destroy(&receiver, 0); goto cleanup; } receiver.instance_ref = strdup(lyd_get_value(node_receiver_ref)); if (!receiver.instance_ref) { + EMEM; rc = SR_ERR_NO_MEMORY; - csn_receiver_destroy(&receiver, 0); goto cleanup; } recv_config = csn_receiver_config_get_by_name(lyd_get_value(node_receiver_ref)); if (!recv_config) { - EINT; - csn_receiver_destroy(&receiver, 0); + ERR("Cannot get receiver config."); + rc = SR_ERR_NOT_FOUND; goto cleanup; } rc = csn_receiver_start(&receiver, recv_config, recv_info); if (rc) { - ERR("Cannot init receiver"); - csn_receiver_destroy(&receiver, 0); + ERR("Cannot init receiver."); + rc = SR_ERR_INTERNAL; goto cleanup; } @@ -688,16 +884,26 @@ csn_config_sub_receivers_prepare(const struct lyd_node *node_receiver, rc = csn_receiver_add(recv_info, &receiver); if (rc) { + EMEM; rc = SR_ERR_NO_MEMORY; - ERR("Cannot add receiver"); - csn_receiver_destroy(&receiver, 0); goto cleanup; } + cleanup: + if (rc) { + csn_receiver_destroy(&receiver, 0); + } + return rc; } -int +/** + * @brief Modify a receiver of a subscription. + * + * @param[in] node_receiver the configuration of the receiver. + * @return Sysrepo error value. + */ +static int csn_modify_sub_receiver(const struct lyd_node *node_receiver) { const struct lyd_node *input = lyd_parent(lyd_parent(node_receiver)); @@ -712,7 +918,7 @@ csn_modify_sub_receiver(const struct lyd_node *node_receiver) if (lyd_find_path(input, "id", 0, &node)) { ERR("id not found."); - rc = SR_ERR_INTERNAL; + rc = SR_ERR_INVAL_ARG; goto cleanup; } @@ -737,26 +943,28 @@ csn_modify_sub_receiver(const struct lyd_node *node_receiver) } if (lyd_find_path(node_receiver, "ietf-subscribed-notif-receivers:receiver-instance-ref", 0, &node_receiver_ref)) { - ERR("Missing receiver instance ref"); + ERR("Missing receiver instance ref."); + rc = SR_ERR_INVAL_ARG; goto cleanup; } if (lyd_find_path(node_receiver, "name", 0, &node)) { - rc = SR_ERR_LY; - ERR("Could not find receiver name"); + ERR("Could not find receiver name."); + rc = SR_ERR_INVAL_ARG; goto cleanup; } receiver = csn_receiver_get_by_name(recv_info, lyd_get_value(node)); if (!receiver) { + ERR("Receiver not found."); rc = SR_ERR_INVAL_ARG; - ERR("Receiver not found"); goto cleanup; } recv_config = csn_receiver_config_get_by_name(lyd_get_value(node_receiver_ref)); if (!recv_config) { - EINT; + ERR("Cannot get receiver config."); + rc = SR_ERR_NOT_FOUND; goto cleanup; } @@ -773,7 +981,8 @@ csn_modify_sub_receiver(const struct lyd_node *node_receiver) rc = csn_receiver_start(receiver, recv_config, recv_info); if (rc) { - ERR("Cannot init receiver"); + ERR("Cannot init receiver."); + rc = SR_ERR_INTERNAL; goto cleanup; } @@ -786,8 +995,14 @@ csn_modify_sub_receiver(const struct lyd_node *node_receiver) return rc; } -int -csn_config_sub_receiver(const struct lyd_node *node_receiver) +/** + * @brief add a receiver of a subscription. + * + * @param[in] node_receiver the configuration of the receiver. + * @return Sysrepo error value. + */ +static int +csn_add_sub_receiver(const struct lyd_node *node_receiver) { const struct lyd_node *input = lyd_parent(lyd_parent(node_receiver)); struct csn_receiver_info *recv_info = NULL; @@ -798,7 +1013,7 @@ csn_config_sub_receiver(const struct lyd_node *node_receiver) if (lyd_find_path(input, "id", 0, &node)) { ERR("id not found."); - rc = SR_ERR_INTERNAL; + rc = SR_ERR_INVAL_ARG; goto error; } @@ -806,7 +1021,7 @@ csn_config_sub_receiver(const struct lyd_node *node_receiver) sub = sub_ntf_find(nc_sub_id, 0, 0, 0); if (!sub) { ERR("no such subscription."); - rc = SR_ERR_INTERNAL; + rc = SR_ERR_INVAL_ARG; goto error; } @@ -822,44 +1037,21 @@ csn_config_sub_receiver(const struct lyd_node *node_receiver) goto error; } - rc = csn_config_sub_receivers_prepare(node_receiver, recv_info, nc_sub_id); + rc = csn_add_sub_receivers_prepare(node_receiver, recv_info, nc_sub_id); error: return rc; } -void -csn_receiver_destroy(struct csn_receiver *receiver, int keep_ref) -{ - if (!receiver) { - return; - } - - if (!keep_ref) { - free(receiver->name); - receiver->name = NULL; - - free(receiver->instance_ref); - receiver->instance_ref = NULL; - } - - free_sender_socket(receiver->udp.sender); - receiver->udp.sender = NULL; - - free(receiver->udp.options.address); - receiver->udp.options.address = NULL; - - free(receiver->udp.options.port); - receiver->udp.options.port = NULL; - - free(receiver->udp.options.interface); - receiver->udp.options.interface = NULL; - - free(receiver->udp.options.local_address); - receiver->udp.options.local_address = NULL; -} - -int +/** + * @brief Remove a receiver from a subscription using its name. + * + * @param[in] recv_info the receivers in the subscription. + * @param[in] name the name of the receiver. + * @param[in] nc_sub_id the configured subscription id. + * @return Sysrepo error value. + */ +static int csn_receiver_remove_by_name(struct csn_receiver_info *recv_info, const char *name, uint32_t nc_sub_id) { uint32_t r; @@ -889,7 +1081,13 @@ csn_receiver_remove_by_name(struct csn_receiver_info *recv_info, const char *nam return SR_ERR_INTERNAL; } -int +/** + * @brief Remove a receiver from a subscription. + * + * @param[in] node_receiver the information regarding the receiver. + * @return Sysrepo error value. + */ +static int csn_delete_sub_receiver(const struct lyd_node *node_receiver) { const struct lyd_node *input = lyd_parent(lyd_parent(node_receiver)); @@ -902,15 +1100,15 @@ csn_delete_sub_receiver(const struct lyd_node *node_receiver) if (lyd_find_path(input, "id", 0, &node)) { ERR("id not found."); - rc = SR_ERR_INTERNAL; + rc = SR_ERR_INVAL_ARG; goto error; } nc_sub_id = ((struct lyd_node_term *)node)->value.uint32; if (lyd_find_path(node_receiver, "name", 0, &node)) { - rc = SR_ERR_LY; - ERR("Could not find receiver name"); + ERR("Could not find receiver name."); + rc = SR_ERR_INVAL_ARG; goto error; } @@ -956,39 +1154,55 @@ np2srv_rpc_establish_sub_cb(sr_session_ctx_t *session, uint32_t UNUSED(sub_id), /* find this NETCONF session */ if ((rc = np_find_user_sess(session, __func__, &ncs, NULL))) { - goto error; + return rc; } - return np2srv_establish_sub_cb(session, input, output, ncs); - -error: - - return rc; + return np2srv_establish_sub(session, input, output, ncs); } -static struct lyd_node * +/** + * @brief Helpers to get the actual configured subscription config. + * + * @param[in] session sysrepo session. + * @param[in] nc_sub_id the configured subscription id. + * @return tree of the subscription config on success, NULL on error. + */ +static sr_data_t * get_sr_config_sub_ntf(sr_session_ctx_t *session, uint32_t nc_sub_id) { sr_data_t *data_node; char *xpath; - asprintf(&xpath, "/ietf-subscribed-notifications:subscriptions/subscription[id=%" PRIu32 "]", nc_sub_id); + if (asprintf(&xpath, "/ietf-subscribed-notifications:subscriptions/subscription[id=%" PRIu32 "]", nc_sub_id) == -1) { + EMEM; + return NULL; + } + sr_get_subtree(session, xpath, 0, &data_node); free(xpath); - if (data_node) { - return data_node->tree; - } - - return NULL; + return data_node; } -int -csn_config_sub(sr_session_ctx_t *session, const struct lyd_node *input) +/** + * @brief Establish a new confgured subscription + * + * @param[in] session sysrepo session. + * @param[in] input the data containing the subscription info. + * @return Sysrepo error value. + */ +static int +csn_add_sub(sr_session_ctx_t *session, const struct lyd_node *input) { - return np2srv_establish_sub_cb(session, input, NULL, NULL); + return np2srv_establish_sub(session, input, NULL, NULL); } +/** + * @brief Delete a receiver config from the list. + * + * @param[in] name of the receiver config to remove. + * @return 0 on success, -1 on error. + */ static int csn_receiver_config_remove_by_name(const char *name) { @@ -1018,6 +1232,12 @@ csn_receiver_config_remove_by_name(const char *name) return -1; } +/** + * @brief Delete a receiver config from the list. + * + * @param[in] input the information regarding the receiver. + * @return 0 on success, -1 on error. + */ static int csn_receiver_config_delete(const struct lyd_node *input) { @@ -1025,20 +1245,26 @@ csn_receiver_config_delete(const struct lyd_node *input) int rc = SR_ERR_OK; if (lyd_find_path(input, "name", 0, &name_node)) { - ERR("Missing receiver name\n"); - return -1; + ERR("Missing receiver name."); + return SR_ERR_INVAL_ARG; } /* remove from receivers */ rc = csn_receiver_config_remove_by_name(lyd_get_value(name_node)); if (rc) { - ERR("Cannot remove receiver\n"); - return -1; + ERR("Cannot remove receiver."); + return SR_ERR_INTERNAL; } - return 0; + return rc; } +/** + * @brief restart a receiver of all the subscriptions using it. + * + * @param[in] recv_config the information regarding the receiver. + * @return Sysrepo error value. + */ static int csn_receivers_restart(struct csn_receiver_config *recv_config) { @@ -1075,7 +1301,8 @@ csn_receivers_restart(struct csn_receiver_config *recv_config) csn_receiver_destroy(receiver, 1); rc = csn_receiver_start(receiver, recv_config, recv_info); if (rc) { - ERR("Cannot init receiver"); + ERR("Cannot init receiver."); + rc = SR_ERR_INTERNAL; goto cleanup; } @@ -1089,6 +1316,12 @@ csn_receivers_restart(struct csn_receiver_config *recv_config) return rc; } +/** + * @brief Modify a receiver configuration. + * + * @param[in] input the information regarding the receiver. + * @return Sysrepo error value. + */ static int csn_receiver_config_modify(const struct lyd_node *input) { @@ -1100,22 +1333,22 @@ csn_receiver_config_modify(const struct lyd_node *input) int rc = SR_ERR_OK; if (lyd_find_path(input, "name", 0, &name_node)) { - ERR("Missing receiver name\n"); - rc = SR_ERR_NOT_FOUND; + ERR("Missing receiver name."); + rc = SR_ERR_INVAL_ARG; goto error; } /* get from receivers */ recv_config = csn_receiver_config_get_by_name(lyd_get_value(name_node)); - if (rc) { - ERR("Cannot get receiver config.\n"); + if (!recv_config) { + ERR("Cannot get receiver config."); rc = SR_ERR_NOT_FOUND; goto error; } if (lyd_find_path(input, "ietf-udp-notif-transport:udp-notif-receiver", 0, &receiver_node)) { ERR("Missing mandatory \"udp-notif-receiver\" leave."); - rc = SR_ERR_LY; + rc = SR_ERR_INVAL_ARG; goto error; } @@ -1123,7 +1356,7 @@ csn_receiver_config_modify(const struct lyd_node *input) free(recv_config->udp.address); recv_config->udp.address = strdup(lyd_get_value(remote_address_node)); if (!recv_config->udp.address) { - ERR("Cannot allocate remote address.\n"); + EMEM; rc = SR_ERR_NO_MEMORY; goto error; } @@ -1133,7 +1366,7 @@ csn_receiver_config_modify(const struct lyd_node *input) free(recv_config->udp.port); recv_config->udp.port = strdup(lyd_get_value(remote_port_node)); if (!recv_config->udp.port) { - ERR("Cannot allocate remote port.\n"); + EMEM; rc = SR_ERR_NO_MEMORY; goto error; } @@ -1141,7 +1374,7 @@ csn_receiver_config_modify(const struct lyd_node *input) rc = csn_receivers_restart(recv_config); if (rc) { - ERR("Cannot init receivers"); + ERR("Cannot init receivers."); goto error; } @@ -1177,74 +1410,12 @@ csn_receiver_info_destroy(struct csn_receiver_info *recv_info) recv_info->count = 0; } -struct csn_receiver_config * -csn_receiver_config_get_by_name(const char *name) -{ - uint32_t r; - - for (r = 0; r < info.recv_cfg_count; r++) { - if (!strcmp(name, info.recv_configs[r].instance_name)) { - return &info.recv_configs[r]; - } - } - - return NULL; -} - -int -csn_receiver_start(struct csn_receiver *receiver, struct csn_receiver_config *recv_config, - struct csn_receiver_info *recv_info) -{ - receiver->state = CSN_RECEIVER_STATE_CONNECTING; - receiver->reset_time = np_gettimespec(1); - - receiver->udp.options.default_mtu = 1500; - receiver->udp.options.address = strdup(recv_config->udp.address); - if (!receiver->udp.options.address) { - EMEM; - goto error; - } - - receiver->udp.options.port = strdup(recv_config->udp.port); - if (!receiver->udp.options.port) { - EMEM; - goto error; - } - - if (recv_info->local_address) { - receiver->udp.options.local_address = strdup(recv_info->local_address); - if (!receiver->udp.options.local_address) { - EMEM; - goto error; - } - } - - if (recv_info->interface) { - receiver->udp.options.interface = strdup(recv_info->interface); - if (!receiver->udp.options.interface) { - EMEM; - goto error; - } - } - - receiver->udp.sender = unyte_start_sender(&receiver->udp.options); - if (!receiver->udp.sender) { - ERR("Cannot create udp sender\n"); - goto error; - } - - receiver->state = CSN_RECEIVER_STATE_ACTIVE; - - return 0; - -error: - free(receiver->udp.options.address); - free(receiver->udp.options.port); - free(receiver->udp.options.interface); - free(receiver->udp.options.local_address); - return -1; -} - +/** + * @brief reset a receiver. + * + * @param[in] receiver the receiver to restart. + * @return 0 on success, -1 on failure. + */ static int csn_receiver_reset(struct csn_receiver *receiver) { @@ -1254,7 +1425,7 @@ csn_receiver_reset(struct csn_receiver *receiver) receiver->udp.sender = unyte_start_sender(&receiver->udp.options); if (!receiver->udp.sender) { - ERR("Cannot create udp sender\n"); + ERR("Cannot create udp sender: (%s).", strerror(errno)); goto error; } @@ -1266,24 +1437,12 @@ csn_receiver_reset(struct csn_receiver *receiver) return -1; } -int -csn_receiver_add(struct csn_receiver_info *recv_info, struct csn_receiver *receiver) -{ - void *mem; - - mem = realloc(recv_info->receivers, (recv_info->count + 1) * sizeof *receiver); - if (!mem) { - return -1; - } - - recv_info->receivers = mem; - - recv_info->receivers[recv_info->count] = *receiver; - ++recv_info->count; - - return 0; -} - +/** + * @brief Create a receiver configuration. + * + * @param[in] input the information regarding the receiver config. + * @return Sysrepo error value. + */ static int csn_receiver_config_start(const struct lyd_node *input) { @@ -1295,50 +1454,58 @@ csn_receiver_config_start(const struct lyd_node *input) int rc = SR_ERR_OK; if (lyd_find_path(input, "name", 0, &name_node)) { - ERR("Missing receiver name\n"); + ERR("Missing receiver name."); + rc = SR_ERR_INVAL_ARG; goto error; } recv_config.instance_name = strdup(lyd_get_value(name_node)); if (!recv_config.instance_name) { - ERR("Cannot allocate instance_name\n"); + EMEM; + rc = SR_ERR_NO_MEMORY; goto error; } /* detect type */ if (lyd_find_path(input, "ietf-udp-notif-transport:udp-notif-receiver", 0, &receiver_node)) { ERR("Missing mandatory \"udp-notif-receiver\" leave."); + rc = SR_ERR_INVAL_ARG; goto error; } recv_config.type = CSN_TRANSPORT_UDP; if (lyd_find_path(receiver_node, "remote-address", 0, &remote_address_node)) { - ERR("Missing receiver remote address\n"); + ERR("Missing receiver remote address."); + rc = SR_ERR_INVAL_ARG; goto error; } recv_config.udp.address = strdup(lyd_get_value(remote_address_node)); if (!recv_config.udp.address) { - ERR("Cannot allocate remote address\n"); + EMEM; + rc = SR_ERR_NO_MEMORY; goto error; } if (lyd_find_path(receiver_node, "remote-port", 0, &remote_port_node)) { - ERR("Missing receiver remote port\n"); + ERR("Missing receiver remote port."); + rc = SR_ERR_INVAL_ARG; goto error; } recv_config.udp.port = strdup(lyd_get_value(remote_port_node)); if (!recv_config.udp.port) { - ERR("Cannot allocate remote port\n"); + EMEM; + rc = SR_ERR_NO_MEMORY; goto error; } /* add into receivers, is not accessible before */ rc = csn_receiver_config_add(&recv_config); if (rc) { - ERR("Cannot add receiver\n"); + EMEM; + rc = SR_ERR_NO_MEMORY; goto error; } @@ -1708,7 +1875,14 @@ np2srv_rpc_delete_sub_cb(sr_session_ctx_t *session, uint32_t UNUSED(sub_id), con return rc; } -int +/** + * @brief Delete a configured subscription. + * + * @param[in] session sysrepo session. + * @param[in] input the information regarding the subscription. + * @return Sysrepo error value. + */ +static int csn_delete_sub(sr_session_ctx_t *session, const struct lyd_node *input) { struct np2srv_sub_ntf *sub; @@ -1718,20 +1892,24 @@ csn_delete_sub(sr_session_ctx_t *session, const struct lyd_node *input) uint32_t nc_sub_id; /* id */ - lyd_find_path(input, "id", 0, &node); + if (lyd_find_path(input, "id", 0, &node)) { + ERR("Could not find subscription id."); + return SR_ERR_INVAL_ARG; + } + nc_sub_id = ((struct lyd_node_term *)node)->value.uint32; /* WRITE LOCK */ sub = sub_ntf_find(nc_sub_id, 0, 1, 0); if (!sub) { if (asprintf(&message, "Subscription with ID %" PRIu32 " for the current receiver does not exist.", nc_sub_id) == -1) { - rc = SR_ERR_NO_MEMORY; EMEM; + rc = SR_ERR_NO_MEMORY; return rc; } np_err_ntf_sub_no_such_sub(session, message); - ERR("np_err_ntf_sub_no_such_sub"); + ERR("No such subscription."); rc = SR_ERR_INVAL_ARG; return rc; @@ -1740,7 +1918,7 @@ csn_delete_sub(sr_session_ctx_t *session, const struct lyd_node *input) /* terminate the subscription */ rc = sub_ntf_terminate_sub(sub, NULL); if (rc != SR_ERR_OK) { - ERR("sub_ntf_terminate_sub"); + ERR("Error on subscription termination."); goto cleanup_unlock; } @@ -1916,7 +2094,10 @@ np2srv_config_receivers_cb(sr_session_ctx_t *session, while ((r = sr_get_change_tree_next(session, iter, &op, &node, NULL, NULL, NULL)) == SR_ERR_OK) { if (op == SR_OP_MODIFIED) { - csn_receiver_config_modify(lyd_parent(lyd_parent(node))); + rc = csn_receiver_config_modify(lyd_parent(lyd_parent(node))); + if (rc != SR_ERR_OK) { + goto cleanup; + } } } @@ -1935,16 +2116,20 @@ np2srv_config_receivers_cb(sr_session_ctx_t *session, } int -np2srv_config_subscriptions_cb(sr_session_ctx_t *session, - uint32_t UNUSED(sub_id), const char *UNUSED(module_name), const char *UNUSED(path), - sr_event_t UNUSED(event), uint32_t UNUSED(request_id), void *UNUSED(private_data)) +np2srv_config_subscriptions_cb(sr_session_ctx_t *session, uint32_t UNUSED(sub_id), const char *UNUSED(module_name), + const char *UNUSED(path), sr_event_t event, uint32_t UNUSED(request_id), void *UNUSED(private_data)) { sr_change_iter_t *iter = NULL; const struct lyd_node *node; uint32_t last_nc_sub_id = 0; + sr_data_t *data_node = NULL; int r, rc = SR_ERR_OK; sr_change_oper_t op; + if (event != SR_EV_CHANGE) { + return SR_ERR_OK; + } + /* subscribed-notifications */ rc = sr_get_changes_iter(session, "/ietf-subscribed-notifications:subscriptions/subscription", &iter); if (rc != SR_ERR_OK) { @@ -1953,9 +2138,8 @@ np2srv_config_subscriptions_cb(sr_session_ctx_t *session, } while ((r = sr_get_change_tree_next(session, iter, &op, &node, NULL, NULL, NULL)) == SR_ERR_OK) { - if (op == SR_OP_CREATED) { - rc = csn_config_sub(session, node); + rc = csn_add_sub(session, node); if (rc != SR_ERR_OK) { goto cleanup; } @@ -1966,7 +2150,6 @@ np2srv_config_subscriptions_cb(sr_session_ctx_t *session, } } } - if (r != SR_ERR_NOT_FOUND) { rc = r; ERR("Getting next change failed (%s).", sr_strerror(rc)); @@ -2003,29 +2186,35 @@ np2srv_config_subscriptions_cb(sr_session_ctx_t *session, } /* restart subscription with actual config */ - config = get_sr_config_sub_ntf(session, nc_sub_id); - if (!config) { - VRB("Could not find sub config %lu", nc_sub_id); + data_node = get_sr_config_sub_ntf(session, nc_sub_id); + if (!data_node || !data_node->tree) { + ERR("Could not find sub config %" PRIu32 ".", nc_sub_id); rc = SR_ERR_INTERNAL; goto cleanup; } + config = data_node->tree; + /* delete and create the subscriptions */ - rc = csn_delete_sub(session, lyd_parent(node)); - if (rc != SR_ERR_OK) { - VRB("Could not delete subscription"); + if ((rc = csn_delete_sub(session, lyd_parent(node)))) { + goto cleanup; + } + if ((rc = csn_add_sub(session, config))) { goto cleanup; } - - csn_config_sub(session, config); if (!lyd_find_path(config, "receivers", 0, &node_receivers)) { LY_LIST_FOR(lyd_child(node_receivers), node_receiver) { - csn_config_sub_receiver(node_receiver); + rc = csn_add_sub_receiver(node_receiver); + if (rc != SR_ERR_OK) { + goto cleanup; + } } } /* save id of modified subscription */ last_nc_sub_id = nc_sub_id; + sr_release_data(data_node); + data_node = NULL; } if (r != SR_ERR_NOT_FOUND) { @@ -2035,7 +2224,7 @@ np2srv_config_subscriptions_cb(sr_session_ctx_t *session, } cleanup: - + sr_release_data(data_node); sr_free_change_iter(iter); return rc; } @@ -2058,9 +2247,15 @@ np2srv_config_subscriptions_receivers_cb(sr_session_ctx_t *session, while ((r = sr_get_change_tree_next(session, iter, &op, &node, NULL, NULL, NULL)) == SR_ERR_OK) { if (op == SR_OP_CREATED) { - csn_config_sub_receiver(node); + rc = csn_add_sub_receiver(node); + if (rc) { + goto cleanup; + } } else if (op == SR_OP_DELETED) { - csn_delete_sub_receiver(node); + rc = csn_delete_sub_receiver(node); + if (rc) { + goto cleanup; + } } } @@ -2080,7 +2275,10 @@ np2srv_config_subscriptions_receivers_cb(sr_session_ctx_t *session, while ((r = sr_get_change_tree_next(session, iter, &op, &node, NULL, NULL, NULL)) == SR_ERR_OK) { if (op == SR_OP_MODIFIED) { - csn_modify_sub_receiver(lyd_parent(node)); + rc = csn_modify_sub_receiver(lyd_parent(node)); + if (rc) { + goto cleanup; + } } } @@ -2174,7 +2372,7 @@ np2srv_oper_sub_ntf_streams_cb(sr_session_ctx_t *session, uint32_t UNUSED(sub_id int np2srv_oper_sub_ntf_subscriptions_cb(sr_session_ctx_t *session, uint32_t UNUSED(sub_id), const char *UNUSED(module_name), - const char *UNUSED(path), const char *request_xpath, uint32_t UNUSED(request_id), + const char *UNUSED(path), const char *UNUSED(request_xpath), uint32_t UNUSED(request_id), struct lyd_node **parent, void *UNUSED(private_data)) { const struct ly_ctx *ly_ctx; @@ -2186,11 +2384,6 @@ np2srv_oper_sub_ntf_subscriptions_cb(sr_session_ctx_t *session, uint32_t UNUSED( char *name = NULL; uint32_t id; - if (strstr(request_xpath, "subscriptions/subscription[") && - strstr(request_xpath, "/receivers/receiver[")) { - return np2srv_oper_sub_ntf_receivers_cb(session, 0, NULL, NULL, request_xpath, 0, parent, NULL); - } - /* context is locked while the callback is executing */ ly_ctx = sr_session_acquire_context(session); sr_session_release_context(session); @@ -2356,8 +2549,8 @@ np2srv_rpc_reset_receiver_cb(sr_session_ctx_t *UNUSED(session), uint32_t UNUSED( INFO_WLOCK; if (lyd_find_path(lyd_parent(input), "name", 0, &node)) { - rc = SR_ERR_LY; - ERR("Could not find receiver name"); + ERR("Could not find receiver name."); + rc = SR_ERR_INVAL_ARG; goto cleanup; } @@ -2366,17 +2559,17 @@ np2srv_rpc_reset_receiver_cb(sr_session_ctx_t *UNUSED(session), uint32_t UNUSED( input = lyd_parent(lyd_parent(lyd_parent(input))); if (lyd_find_path(input, "id", 0, &node)) { - rc = SR_ERR_LY; - ERR("Could not find subscription id"); + ERR("Could not find subscription id."); + rc = SR_ERR_INVAL_ARG; goto cleanup; } - nc_sub_id = strtoul(lyd_get_value(node), NULL, 10); + nc_sub_id = ((struct lyd_node_term *)node)->value.uint32; sub = sub_ntf_find(nc_sub_id, 0, 0, 0); if (!sub) { + ERR("Subscription not found."); rc = SR_ERR_INVAL_ARG; - ERR("Subscription not found"); goto cleanup; } @@ -2388,20 +2581,20 @@ np2srv_rpc_reset_receiver_cb(sr_session_ctx_t *UNUSED(session), uint32_t UNUSED( recv_info = yang_push_receivers_info_get(sub->data); break; default: - ERR("Bad subscription type"); + ERR("Bad subscription type."); break; } if (!recv_info) { + ERR("Receiver info not found."); rc = SR_ERR_INVAL_ARG; - ERR("Receiver info not found"); goto cleanup; } receiver = csn_receiver_get_by_name(recv_info, receiver_name); if (!receiver) { + ERR("Receiver not found."); rc = SR_ERR_INVAL_ARG; - ERR("Receiver not found"); goto cleanup; } @@ -2410,8 +2603,8 @@ np2srv_rpc_reset_receiver_cb(sr_session_ctx_t *UNUSED(session), uint32_t UNUSED( } if (csn_receiver_reset(receiver)) { + ERR("Receiver could not be reset."); rc = SR_ERR_INVAL_ARG; - ERR("Receiver could not be reset"); goto cleanup; } @@ -2422,8 +2615,8 @@ np2srv_rpc_reset_receiver_cb(sr_session_ctx_t *UNUSED(session), uint32_t UNUSED( if (output) { ly_time_ts2str(&receiver->reset_time, &time_str); if (lyd_new_term(output, NULL, "time", time_str, 1, NULL)) { + ERR("Could not add time."); rc = SR_ERR_LY; - ERR("Could not add time"); goto cleanup; } } @@ -2436,24 +2629,3 @@ np2srv_rpc_reset_receiver_cb(sr_session_ctx_t *UNUSED(session), uint32_t UNUSED( return rc; } -int -np2srv_oper_sub_ntf_receivers_cb(sr_session_ctx_t *session, uint32_t UNUSED(sub_id), const char *UNUSED(module_name), - const char *UNUSED(path), const char *request_xpath, uint32_t UNUSED(request_id), - struct lyd_node **parent, void *UNUSED(private_data)) -{ - const struct ly_ctx *ly_ctx; - struct lyd_node *root; - - /* context is locked while the callback is executing */ - ly_ctx = sr_session_acquire_context(session); - sr_session_release_context(session); - - if (lyd_new_path(NULL, ly_ctx, request_xpath, NULL, 0, &root)) { - lyd_free_tree(root); - return SR_ERR_LY; - } - - *parent = root; - - return SR_ERR_OK; -} diff --git a/src/netconf_subscribed_notifications.h b/src/netconf_subscribed_notifications.h index f947f881..3c299c3f 100644 --- a/src/netconf_subscribed_notifications.h +++ b/src/netconf_subscribed_notifications.h @@ -232,38 +232,4 @@ int csn_send_notif(struct csn_receiver_info *recv_info, uint32_t nc_sub_id, */ void csn_receiver_info_destroy(struct csn_receiver_info *recv_info); -/** - * @brief Destroy content of receiver in a subscription - * - * @param[in] receiver in the subscription in the receiver_info. - */ -void csn_receiver_destroy(struct csn_receiver *receiver, int keep_ref); - -/** - * @brief start a receiver - * - * @param[in] receiver in the subscription in the receiver_info. - * @param[in] receiver_config is the global receiver config - * @param[in] receiver_info in the subscription. - * @return Sysrepo error value. - */ -int csn_receiver_start(struct csn_receiver *receiver, struct csn_receiver_config *recv_config, - struct csn_receiver_info *recv_info); -/** - * @brief add a receiver in the receiver_info list - * - * @param[in] receiver in the subscription in the receiver_info. - * @param[in] receiver_info in the subscription. - * @return Sysrepo error value. - */ -int csn_receiver_add(struct csn_receiver_info *recv_info, struct csn_receiver *receiver); - -/** - * @brief get a receiver configration from the list - * - * @param[in] name of a receiver configuration. - * @return a config containing receiver connection parameters - */ -struct csn_receiver_config *csn_receiver_config_get_by_name(const char *name); - #endif /* NP2SRV_NETCONF_SUBSCRIBED_NOTIFICATIONS_H_ */ diff --git a/src/subscribed_notifications.c b/src/subscribed_notifications.c index 3f5b6171..f71bd232 100644 --- a/src/subscribed_notifications.c +++ b/src/subscribed_notifications.c @@ -193,6 +193,7 @@ np2srv_rpc_establish_sub_ntf_cb(sr_session_ctx_t *UNUSED(session), uint32_t sub_ * @param[in] ev_sess Event session for reporting errors. * @param[out] sub_ids Generated sysrepo subscription IDs, the first one is used as sub-ntf subscription ID. * @param[out] sub_id_count Number of @p sub_ids. + * @param[in] sr_sub_ctx is the sysrepo context. * @return Sysrepo error value. */ static int diff --git a/src/yang_push.c b/src/yang_push.c index 04d16b98..426794ff 100644 --- a/src/yang_push.c +++ b/src/yang_push.c @@ -562,6 +562,7 @@ np2srv_change_yang_push_cb(sr_session_ctx_t *session, uint32_t UNUSED(sub_id), c * @param[in] ev_sess Event sysrepo session for errors. * @param[in,out] sub_ids Array of SR sub IDs to add to. * @param[in,out] sub_id_count Number of items in @p sub_ids. + * @param[in] sr_sub_ctx is the sysrepo context. * @return Sysrepo error value. */ static int @@ -669,6 +670,7 @@ yang_push_sr_subscribe_filter_collect_mods(const struct ly_ctx *ly_ctx, const ch * @param[in] ev_sess Event session for reporting errors. * @param[out] sub_ids Generated sysrepo subscription IDs, the first one is used as sub-ntf subscription ID. * @param[out] sub_id_count Number of @p sub_ids. + * @param[in] sr_sub_ctx is the sysrepo context. * @return Sysrepo error value. */ static int @@ -879,17 +881,18 @@ yang_push_notif_update_send(struct nc_session *ncs, sr_session_ctx_t *sr_sess, s const struct ly_ctx *ly_ctx; sr_data_t *data = NULL; char buf[11]; - int rc = SR_ERR_OK; + int rc = SR_ERR_OK, r; /* switch to the datastore */ sr_session_switch_ds(sr_sess, yp_data->datastore); /* get the data from sysrepo */ - rc = sr_get_data(sr_sess, yp_data->xpath ? yp_data->xpath : "/*", 0, np2srv.sr_timeout, 0, &data); - if (rc != SR_ERR_OK) { - if (ncs) { - goto cleanup; - } + r = sr_get_data(sr_sess, yp_data->xpath ? yp_data->xpath : "/*", 0, np2srv.sr_timeout, 0, &data); + if (r == SR_ERR_NOT_FOUND) { + WRN("XPath \"%s\" does not match any schema or data nodes."); + } else if (r) { + rc = r; + goto cleanup; } /* context lock is already held by data */