Skip to content

Commit

Permalink
add centreon-report-events
Browse files Browse the repository at this point in the history
  • Loading branch information
tanguyvda committed Nov 13, 2024
1 parent a0f4205 commit 04ff4d0
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 33 deletions.
132 changes: 103 additions & 29 deletions centreon-certified/centreon-report/centreon-report-events-apiv2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ function EventQueue.new(params)

local mandatory_parameters = {
"api_token",
"warp10_http_address"
"centreon_report_http_address"
}

self.fail = false

-- set up log configuration
local logfile = params.logfile or "/var/log/centreon-broker/warp10-events.log"
local logfile = params.logfile or "/var/log/centreon-broker/centreon-report-events.log"
local log_level = params.log_level or 1

-- initiate mandatory objects
Expand All @@ -62,8 +62,10 @@ function EventQueue.new(params)
hg = false,
sg = false,
poller = false,
bv = false,
},
labels = {
bv = false,
hg = false,
sg = false,
poller = false,
Expand All @@ -73,13 +75,13 @@ function EventQueue.new(params)

-- overriding default parameters for this stream connector if the default values doesn't suit the basic needs
self.sc_params.params.api_token = params.api_token
self.sc_params.params.warp10_address = params.warp10_http_address
self.sc_params.params.warp10_api_endpoint = params.warp10_api_endpoint or "/api/v0/update"
self.sc_params.params.centreon_report_http_address = params.centreon_report_http_address
self.sc_params.params.centreon_report_api_endpoint = params.centreon_report_api_endpoint or "/v1"
self.sc_params.params.warp10_accepted_labels = params.warp10_accepted_labels or ""
self.sc_params.params.warp10_accepted_attributes = params.warp10_accepted_attributes or ""
self.sc_params.params.cmaas = params.cmaas or "centreon"
self.sc_params.params.accepted_categories = params.accepted_categories or "neb"
self.sc_params.params.accepted_elements = params.accepted_elements or "host_status,service_status"
self.sc_params.params.accepted_elements = params.accepted_elements or "host_status,service_status,downtime"
self.sc_params.params.max_buffer_size = params.max_buffer_size or 30
self.sc_params.params.hard_only = params.hard_only or 0
self.sc_params.params.enable_host_status_dedup = params.enable_host_status_dedup or 0
Expand All @@ -106,14 +108,12 @@ function EventQueue.new(params)
self.format_event = {
[categories.neb.id] = {
[elements.host_status.id] = function () return self:format_event_host() end,
[elements.service_status.id] = function () return self:format_event_service() end
}
}

self.format_metric = {
[categories.neb.id] = {
[elements.host_status.id] = function (metric) return self:format_metric_host(metric) end,
[elements.service_status.id] = function (metric) return self:format_metric_service(metric) end
[elements.service_status.id] = function () return self:format_event_service() end,
[elements.downtime.id] = function () return self:format_event_downtime() end,
[elements.acknowledgement.id] = function () return self:format_event_acknowledgement() end
},
[categories.bam.id] = {
[elements.ba_status.id] = function () return self:format_event_ba() end
}
}

Expand Down Expand Up @@ -222,22 +222,69 @@ function EventQueue:format_event_service()
}
end

-- makes no sense at the moment
-- function EventQueue:format_event_downtime()
-- local event = self.sc_event.event
function EventQueue:format_event_downtime()
local event = self.sc_event.event
-- 0 = end of downtime, 1 = start of downtime
local downtime_step = 0
local event_time = event.deletion_time

if event.downtime_processing_step == "start" then
downtime_step = 1
event_time = event.actual_start_time
end

self.sc_event.event.formated_event = {
data = event_time .. "000000// centreon:downtime" .. self:build_labels() .. self:build_attributes() .. " " .. downtime_step
}
end

function EventQueue:format_event_acknowledgement()
local event = self.sc_event.event
-- 0 = end of ack, 1 = start of ack
local ack_step = 0
local event_time = event.deletion_time

-- self.sc_event.event.formated_event = {
-- data = event.last_check .. "000000// centreon:downtime" .. self:build_labels() .. self:build_attributes() .. " " .. event.state
-- }
-- end
if not event.deletion_time or event.deletion_time == 0 then
event_time = event.entry_time
ack_step = 1
end

self.sc_event.event.formated_event = {
data = event_time .. "000000// centreon:acknowledgement" .. self:build_labels() .. self:build_attributes() .. " " .. ack_step
}
end

function EventQueue:format_event_ba()
local event = self.sc_event.event

local attributes = ""
local labels = "ba_name=" .. event.cache.ba.ba_name .. ",resource_type=BA,ba_id=" .. event.ba_id

-- add business views name in attributes if asked to
if event.cache.bvs and event.cache.bvs[1] and self.warp10_format.attributes.bv then
attributes = "{bvs=" .. broker.url_encode(self:get_bv_string(event)) .. "}"
end

if self.warp10_format.labels._cmaas then
labels = labels .. ",_cmaas=" .. broker.url_encode(self.sc_params.params.cmaas)
end

-- add business views name in labels if asked to
if event.cache.bvs and event.cache.bvs[1] and self.warp10_format.labels.bv then
labels = labels .. ",bvs=" .. broker.url_encode(self:get_bv_string(event))
end

self.sc_event.event.formated_event = {
data = event.last_state_change .. "000000// centreon:ba{" .. labels .. "}" .. attributes .. " " .. event.state
}
end


--------------------------------------------------------------------------------
---- EventQueue:build_labels method
-- @param metric {table} a single metric data
-- @return labels_string {string} a string with all labels
--------------------------------------------------------------------------------
function EventQueue:build_labels(metric)
function EventQueue:build_labels()
local event = self.sc_event.event
local params = self.sc_params.params
local resource_type = "host"
Expand All @@ -263,10 +310,9 @@ end

--------------------------------------------------------------------------------
---- EventQueue:build_attributes method
-- @param metric {table} a single metric data
-- @return tags {table} a string with all attributes
--------------------------------------------------------------------------------
function EventQueue:build_attributes(metric)
function EventQueue:build_attributes()
local event = self.sc_event.event
local attributes_string = ""

Expand Down Expand Up @@ -330,6 +376,20 @@ function EventQueue:get_sg_string(event)
return servicegroups_string
end

function EventQueue:get_bv_string(event)
local bvs_string = ""

for _, bv in ipairs(event.cache.bvs) do
if bvss_string == "" then
bvs_string = bvs.bv_name
else
bvs_string = bvs_string .. " " .. bv.bv_name
end
end

return bvs_string
end

--------------------------------------------------------------------------------
-- EventQueue:add, add an event to the sending queue
--------------------------------------------------------------------------------
Expand All @@ -355,10 +415,23 @@ end
-- @return payload {string} json encoded string
--------------------------------------------------------------------------------
function EventQueue:build_payload(payload, event)
local data_table = {
type = "metric",
timestamp = os.date("!%Y-%m-%dT%TZ", t),
version = self.sc_params.params.api_version,
contentType = "warp10/plaintext",
content = event.data
}

if not payload then
payload = event.data
payload = {
version = self.sc_params.params.api_version,
data = {
data_table
}
}
else
payload = payload .. "\n" .. event.data
table.insert(payload.data, data_table)
end

return payload
Expand All @@ -367,10 +440,11 @@ end
function EventQueue:send_data(payload, queue_metadata)
self.sc_logger:debug("[EventQueue:send_data]: Starting to send data")

local url = tostring(self.sc_params.params.warp10_address) .. tostring(self.sc_params.params.warp10_api_endpoint)
payload = broker.json_encode(payload)
local url = tostring(self.sc_params.params.centreon_report_http_address) .. tostring(self.sc_params.params.centreon_report_api_endpoint)
queue_metadata.headers = {
"Transfer-Encoding:chunked",
"X-Warp10-Token:" .. self.sc_params.params.api_token
"Content-Type: application/vnd.centreon+json",
"x-api-key: " .. self.sc_params.params.api_token
}

self.sc_logger:log_curl_command(url, queue_metadata, self.sc_params.params, payload)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ function EventQueue.new(params)

-- overriding default parameters for this stream connector if the default values doesn't suit the basic needs
self.sc_params.params.api_token = params.api_token
self.sc_params.params.warp10_address = params.centreon_report_http_address
self.sc_params.params.warp10_api_endpoint = params.centreon_report_api_endpoint or "/v1"
self.sc_params.params.centreon_report_http_address = params.centreon_report_http_address
self.sc_params.params.centreon_report_api_endpoint = params.centreon_report_api_endpoint or "/v1"
self.sc_params.params.warp10_accepted_labels = params.warp10_accepted_labels or ""
self.sc_params.params.warp10_accepted_attributes = params.warp10_accepted_attributes or ""
self.sc_params.params.version = params.version or "1.0.0"
Expand Down Expand Up @@ -460,7 +460,7 @@ function EventQueue:send_data(payload, queue_metadata)
self.sc_logger:debug("[EventQueue:send_data]: Starting to send data")

payload = broker.json_encode(payload)
local url = tostring(self.sc_params.params.warp10_address) .. tostring(self.sc_params.params.warp10_api_endpoint)
local url = tostring(self.sc_params.params.centreon_report_http_address) .. tostring(self.sc_params.params.centreon_report_api_endpoint)
queue_metadata.headers = {
"Content-Type: application/vnd.centreon+json",
"x-api-key: " .. self.sc_params.params.api_token
Expand Down
4 changes: 3 additions & 1 deletion modules/centreon-stream-connectors-lib/sc_event.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1021,7 +1021,7 @@ function ScEvent:is_valid_acknowledgement_event()
return true
end

--- is_vaid_downtime_event: check if the event is a valid downtime event
--- is_valid_downtime_event: check if the event is a valid downtime event
-- return true|false (boolean)
function ScEvent:is_valid_downtime_event()
-- return false if the event is one of all the "fake" start or end downtime event received from broker
Expand Down Expand Up @@ -1329,6 +1329,7 @@ function ScEvent:is_valid_downtime_event_start()
end
-- end compat patch

self.event.downtime_processing_step = "start"
return true
end

Expand All @@ -1347,6 +1348,7 @@ function ScEvent:is_valid_downtime_event_end()
end
-- end compat patch

self.event.downtime_processing_step = "end"
return true
end

Expand Down

0 comments on commit 04ff4d0

Please sign in to comment.