Skip to content

Commit

Permalink
Mon 16176 add elasticsearch metricv2 username (#156)
Browse files Browse the repository at this point in the history
* start elastic metric v2

* set user & password as optionnal params

* add new params accepted_hosts and services

* try to fix conflicts

* change name of indexes (replace . by _)

* better naming convention

* add doc for params and new methods

* fix pattern filter

* remove debug and fix json

* fix missing service_desc in payload

* add min_max and thresholds dimension

* fix metric host and add anti spam log system

* improve error logging

* Update modules/centreon-stream-connectors-lib/sc_event.lua

Co-authored-by: cg-tw <[email protected]>

* Update modules/centreon-stream-connectors-lib/sc_event.lua

Co-authored-by: cg-tw <[email protected]>

* Update modules/docs/README.md

Co-authored-by: cg-tw <[email protected]>

* Apply suggestions from code review

Co-authored-by: cg-tw <[email protected]>

---------

Co-authored-by: cg-tw <[email protected]>
  • Loading branch information
tanguyvda and cg-tw authored Jan 19, 2024
1 parent c706ac6 commit dce5ece
Show file tree
Hide file tree
Showing 8 changed files with 366 additions and 92 deletions.
181 changes: 134 additions & 47 deletions centreon-certified/elasticsearch/elastic-metrics-apiv2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ function EventQueue.new(params)
local self = {}

local mandatory_parameters = {
"elastic_username",
"elastic_password",
-- "elastic_username",
-- "elastic_password",
"http_server_url"
}

self.fail = false
self.last_fail_message_date = 0
self.fail_message_counter = 0

-- set up log configuration
local logfile = params.logfile or "/var/log/centreon-broker/elastic-metrics.log"
Expand All @@ -60,8 +62,8 @@ function EventQueue.new(params)
end

-- overriding default parameters for this stream connector if the default values doesn't suit the basic needs
self.sc_params.params.elastic_username = params.elastic_username
self.sc_params.params.elastic_password = params.elastic_password
self.sc_params.params.elastic_username = params.elastic_username or ""
self.sc_params.params.elastic_password = params.elastic_password or ""
self.sc_params.params.http_server_url = params.http_server_url
self.sc_params.params.accepted_categories = params.accepted_categories or "neb"
self.sc_params.params.accepted_elements = params.accepted_elements or "host_status,service_status"
Expand All @@ -84,6 +86,8 @@ function EventQueue.new(params)
self.sc_params.params.add_hostgroups_dimension = params.add_hostgroups_dimension or 1
self.sc_params.params.add_poller_dimension = params.add_poller_dimension or 0
self.sc_params.params.add_servicegroups_dimension = params.add_servicegroups_dimension or 0
self.sc_params.params.add_min_max_dimension = params.add_min_max_dimension or 0
self.sc_params.params.add_thresholds_dimension = params.add_thresholds_dimension or 0
-- can't get geo coords from cache nor event
-- self.sc_params.params.add_geocoords_dimension = params.add_geocoords_dimension or 0

Expand Down Expand Up @@ -145,10 +149,10 @@ function EventQueue:build_index_template(params)
}

self.index_routing_path = {
"host.name",
"service.description",
"metric.name",
"metric.instance",
"host_name",
"service_description",
"metric_name",
"metric_instance",
-- "metric.subinstances"
}

Expand All @@ -162,31 +166,31 @@ function EventQueue:build_index_template(params)
},
mappings = {
properties = {
["host.name"] = {
["host_name"] = {
type = "keyword",
time_series_dimension = true
},
["service.description"] = {
["service_description"] = {
type = "keyword",
time_series_dimension = true
},
["metric.name"] = {
["metric_name"] = {
type = "keyword",
time_series_dimension = true
},
["metric.unit"] = {
["metric_unit"] = {
type = "keyword",
time_series_dimension = false
},
["metric.instance"] = {
["metric_instance"] = {
type = "keyword",
time_series_dimension = true
},
["metric.subinstances"] = {
["metric_subinstances"] = {
type = "keyword",
time_series_dimension = false
},
["metric.value"] = {
["metric_value"] = {
type = "double",
time_series_metric = gauge
},
Expand All @@ -201,7 +205,7 @@ function EventQueue:build_index_template(params)

-- add hostgroup property in the template
if params.add_hostgroups_dimension == 1 then
self.elastic_index_template.template.mappings.properties["host.groups"] = {
self.elastic_index_template.template.mappings.properties["host_groups"] = {
type = "keyword",
time_series_dimension = false
}
Expand All @@ -211,7 +215,7 @@ function EventQueue:build_index_template(params)

-- add servicegroup property in the template
if params.add_servicegroups_dimension == 1 then
self.elastic_index_template.template.mappings.properties["service.groups"] = {
self.elastic_index_template.template.mappings.properties["service_groups"] = {
type = "keyword",
time_series_dimension = false
}
Expand All @@ -229,6 +233,42 @@ function EventQueue:build_index_template(params)
-- table.insert(self.index_routing_path, "poller")
end

-- add min and max property in the template
if params.add_min_max_dimension == 1 then
self.elastic_index_template.template.mappings.properties["metric_min"] = {
type = "keyword",
time_series_dimension = false
}

self.elastic_index_template.template.mappings.properties["metric_max"] = {
type = "keyword",
time_series_dimension = false
}
end

-- add warn and max property in the template
if params.add_thresholds_dimension == 1 then
self.elastic_index_template.template.mappings.properties["metric_warning_low"] = {
type = "keyword",
time_series_dimension = false
}

self.elastic_index_template.template.mappings.properties["metric_warning_high"] = {
type = "keyword",
time_series_dimension = false
}

self.elastic_index_template.template.mappings.properties["metric_critical_low"] = {
type = "keyword",
time_series_dimension = false
}

self.elastic_index_template.template.mappings.properties["metric_critical_high"] = {
type = "keyword",
time_series_dimension = false
}
end


self.elastic_index_template.template.settings["index.routing_path"] = self.index_routing_path
-- add geocoords property in the template
Expand Down Expand Up @@ -264,6 +304,7 @@ function EventQueue:check_index_template(params)
}

local return_code = self:send_data(payload, metadata)

if return_code then
self.sc_logger:notice("[EventQueue:check_index_template]: Elasticsearch index template " .. tostring(params.index_name) .. " has been found")
index_state.is_created = true
Expand Down Expand Up @@ -306,6 +347,11 @@ function EventQueue:create_index_template(params)
end

function EventQueue:validate_index_template(params)
if self.sc_params.params.send_data_test == 1 then
self.sc_logger:notice("[EventQueue:validate_index_template]: send_data_test is set to 1, ignoring template validation")
return true
end

local index_template_structure, error = broker.json_decode(self.elastic_result)

if error then
Expand All @@ -314,21 +360,33 @@ function EventQueue:validate_index_template(params)
end

local required_index_mapping_properties = {
"host.name",
"service.description",
"metric.value",
"metric.unit",
"metric.value",
"metric.instance",
"metric.subinstances"
"host_name",
"service_description",
"metric_value",
"metric_unit",
"metric_value",
"metric_instance",
"metric_subinstances"
}

if params.add_hostgroups_dimension == 1 then
table.insert(required_index_mapping_properties, "host.groups")
table.insert(required_index_mapping_properties, "host_groups")
end

if params.add_servicegroups_dimension == 1 then
table.insert(required_index_mapping_properties, "service.groups")
table.insert(required_index_mapping_properties, "service_groups")
end

if params.add_min_max_dimension == 1 then
table.insert(required_index_mapping_properties, "metric_min")
table.insert(required_index_mapping_properties, "metric_max")
end

if params.add_thresholds_dimension == 1 then
table.insert(required_index_mapping_properties, "metric_warning_low")
table.insert(required_index_mapping_properties, "metric_warning_high")
table.insert(required_index_mapping_properties, "metric_critical_low")
table.insert(required_index_mapping_properties, "metric_critical_high")
end

-- can't get geo coords from cache nor event
Expand Down Expand Up @@ -403,7 +461,6 @@ end
function EventQueue:format_event_host()
local event = self.sc_event.event
self.sc_logger:debug("[EventQueue:format_event_host]: call build_metric ")
self.sc_event.event.formated_event = {}
self.sc_metrics:build_metric(self.format_metric[event.category][event.element])
end

Expand All @@ -413,7 +470,6 @@ end
function EventQueue:format_event_service()
self.sc_logger:debug("[EventQueue:format_event_service]: call build_metric ")
local event = self.sc_event.event
self.sc_event.event.formated_event = {}
self.sc_metrics:build_metric(self.format_metric[event.category][event.element])
end

Expand All @@ -424,7 +480,7 @@ end
function EventQueue:format_metric_host(metric)
self.sc_logger:debug("[EventQueue:format_metric_host]: call format_metric host")
self:add_generic_information(metric)
self:add_generic_optional_information()
self:add_generic_optional_information(metric)
self:add()
end

Expand All @@ -435,9 +491,9 @@ end
function EventQueue:format_metric_service(metric)
self.sc_logger:debug("[EventQueue:format_metric_service]: call format_metric service")

self.sc_event.event.formated_event["service.description"] = tostring(self.sc_event.event.cache.service.description)
self:add_generic_information(metric)
self:add_generic_optional_information()
self.sc_event.event.formated_event["service_description"] = tostring(self.sc_event.event.cache.service.description)
self:add_generic_optional_information(metric)
self:add_service_optional_information()
self:add()
end
Expand All @@ -446,16 +502,16 @@ function EventQueue:add_generic_information(metric)
local event = self.sc_event.event
self.sc_event.event.formated_event = {
["@timestamp"] = event.last_check,
["host.name"] = tostring(event.cache.host.name),
["metric.name"] = tostring(metric.metric_name),
["metric.value"] = metric.value,
["metric.instance"] = metric.instance,
["metric.subinstances"] = metric.subinstances,
["metric.unit"] = metric.unit
["host_name"] = tostring(event.cache.host.name),
["metric_name"] = tostring(metric.metric_name),
["metric_value"] = metric.value,
["metric_instance"] = metric.instance,
["metric_subinstances"] = metric.subinstances,
["metric_unit"] = metric.unit
}
end

function EventQueue:add_generic_optional_information()
function EventQueue:add_generic_optional_information(metric)
local params = self.sc_event.params
local event = self.sc_event.event

Expand All @@ -467,13 +523,35 @@ function EventQueue:add_generic_optional_information()
table.insert(hostgroups, hg_info.group_name)
end

self.sc_event.event.formated_event["host.groups"] = hostgroups
self.sc_event.event.formated_event["host_groups"] = hostgroups
end

-- add poller
if params.add_poller_dimension == 1 then
self.sc_event.event.formated_event.poller = event.cache.poller
end

-- add min and max
if params.add_min_max_dimension == 1 then
self.sc_event.event.formated_event.metric_min = self:handle_NaN(metric.min)
self.sc_event.event.formated_event.metric_max = self:handle_NaN(metric.max)
end

-- add thresholds
if params.add_thresholds_dimension == 1 then
self.sc_event.event.formated_event.metric_warning_low = self:handle_NaN(metric.warning_low)
self.sc_event.event.formated_event.metric_warning_high = self:handle_NaN(metric.warning_high)
self.sc_event.event.formated_event.metric_critical_low = self:handle_NaN(metric.critical_low)
self.sc_event.event.formated_event.metric_critical_high = self:handle_NaN(metric.critical_high)
end
end

function EventQueue:handle_NaN(value)
if value == value then
return value
end

return nil
end

function EventQueue:add_service_optional_information()
Expand All @@ -485,7 +563,7 @@ function EventQueue:add_service_optional_information()
table.insert(servicegroups, sg_info.group_name)
end

self.sc_event.event.formated_event["service.groups"] = servicegroups
self.sc_event.event.formated_event["service_groups"] = servicegroups
end
end

Expand All @@ -502,7 +580,6 @@ function EventQueue:add()

self.sc_logger:debug("[EventQueue:add]: queue size before adding event: " .. tostring(#self.sc_flush.queues[category][element].events))
self.sc_flush.queues[category][element].events[#self.sc_flush.queues[category][element].events + 1] = self.sc_event.event.formated_event

self.sc_logger:info("[EventQueue:add]: queue size is now: " .. tostring(#self.sc_flush.queues[category][element].events)
.. "max is: " .. tostring(self.sc_params.params.max_buffer_size))
end
Expand All @@ -528,12 +605,14 @@ function EventQueue:send_data(payload, queue_metadata)
local params = self.sc_params.params
local url = params.http_server_url .. queue_metadata.endpoint
queue_metadata.headers = {
"Authorization: Basic " .. mime.b64(params.elastic_username .. ":" .. params.elastic_password),
"Content-type: application/json"
}


if payload then
if (params.elastic_username ~= "" and params.elastic_password ~= "") then
table.insert(queue_metadata.headers, "Authorization: Basic " .. mime.b64(params.elastic_username .. ":" .. params.elastic_password))
end

if payload or queue_metadata.method == "GET" then
-- write payload in the logfile for test purpose
if params.send_data_test == 1 then
self.sc_logger:notice("[send_data]: " .. tostring(payload))
Expand Down Expand Up @@ -600,7 +679,7 @@ function EventQueue:send_data(payload, queue_metadata)

if error_json then
self.sc_logger:error("[EventQueue:send_data]: Couldn't decode json from elasticsearch. Error is: " .. tostring(error_json)
.. ". Received json is: " .. tostring(http_response_body))
.. ". Received json is: " .. tostring(http_response_body) .. ". Sent data is: " .. tostring(payload))
return false
end

Expand All @@ -610,7 +689,7 @@ function EventQueue:send_data(payload, queue_metadata)
end


self.sc_logger:error("[EventQueue:send_data]: HTTP POST request FAILED, return code is " .. tostring(http_response_code) .. ". Message is: " .. tostring(http_response_body))
self.sc_logger:error("[EventQueue:send_data]: HTTP POST request FAILED, return code is " .. tostring(http_response_code) .. ". Message is: " .. tostring(http_response_body) .. ". Sent data is: " .. tostring(payload))
return false
end

Expand All @@ -633,7 +712,15 @@ end
function write (event)
-- skip event if a mandatory parameter is missing
if queue.fail then
queue.sc_logger:error("Skipping event because a mandatory parameter is not set or elastic index is not valid")
if queue.fail_message_counter <= 3 and queue.last_fail_message_date + 30 < os.time(os.date("*t")) then
queue.sc_logger:error("Skipping event because a mandatory parameter is not set or elastic index is not valid")
queue.last_fail_message_date = os.time(os.date("*t"))
queue.fail_message_counter = queue.fail_message_counter + 1
elseif queue.fail_message_counter > 3 and queue.last_fail_message_date + 300 < os.time(os.date("*t")) then
queue.sc_logger:error("Skipping event because a mandatory parameter is not set or elastic index is not valid")
queue.last_fail_message_date = os.time(os.date("*t"))
queue.fail_message_counter = queue.fail_message_counter + 1
end
return false
end

Expand Down
2 changes: 1 addition & 1 deletion centreon-certified/splunk/splunk-metrics-apiv2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ function EventQueue.new(params)

-- set up log configuration
local logfile = params.logfile or "/var/log/centreon-broker/splunk-metrics.log"
local log_level = params.log_level or 3
local log_level = params.log_level or 1

-- initiate mandatory objects
self.sc_logger = sc_logger.new(logfile, log_level)
Expand Down
Loading

0 comments on commit dce5ece

Please sign in to comment.