Skip to content

Commit

Permalink
update(anomalydetection): ability to reset data structures w/ timers
Browse files Browse the repository at this point in the history
Signed-off-by: Melissa Kilby <[email protected]>
  • Loading branch information
incertum committed Aug 15, 2024
1 parent 8333473 commit a59e689
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 14 deletions.
4 changes: 3 additions & 1 deletion plugins/anomalydetection/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ plugins:
{
"fields": "%container.id %proc.name %proc.aname[1] %proc.aname[2] %proc.aname[3] %proc.exepath %proc.tty %proc.vpgid.name %proc.sname",
# execve, execveat
"event_codes": [293, 331]
"event_codes": [293, 331],
# optional config `reset_timer_ms`, resets the data structure every x milliseconds, here one hour as example
"reset_timer_ms": 3600000
},
{
"fields": "%container.id %proc.name %proc.aname[1] %proc.aname[2] %proc.aname[3] %proc.exepath %proc.tty %proc.vpgid.name %proc.sname %fd.name",
Expand Down
11 changes: 11 additions & 0 deletions plugins/anomalydetection/src/num/cms.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

#pragma once

#include "xxhash_ext.h"

#include <iostream>
Expand Down Expand Up @@ -96,6 +98,15 @@ class cms
}
}

void reset()
{
// Reset data structure
for (uint64_t i = 0; i < d_; ++i)
{
std::fill(sketch[i].get(), sketch[i].get() + w_, static_cast<T>(0));
}
}

uint64_t hash_XXH3_seed(std::string value, uint64_t seed) const
{
// using https://raw.githubusercontent.com/Cyan4973/xxHash/v0.8.2/xxhash.h
Expand Down
53 changes: 43 additions & 10 deletions plugins/anomalydetection/src/plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ falcosecurity::init_schema anomalydetection::get_init_schema()
"type": "number",
"description": "PPME event codes supported by Falco."
}
},
"reset_timer_ms": {
"type": "number",
"description": "The anomaly detection behavior profile timer, in milliseconds (ms), is used to reset the sketch counts."
}
},
"required": [
Expand Down Expand Up @@ -134,16 +138,17 @@ void anomalydetection::parse_init_config(nlohmann::json& config_json)

// If used, config JSON schema enforces a minimum of 1 items and 2-d sub-arrays
auto gamma_eps_pointer = nlohmann::json::json_pointer("/count_min_sketch/gamma_eps");
m_gamma_eps.clear();
if (config_json.contains(gamma_eps_pointer) && config_json[gamma_eps_pointer].is_array())
{
int i = 0;
int n = 1;
for (const auto& array : config_json[gamma_eps_pointer])
{
if (array.is_array() && array.size() == 2)
{
std::vector<double> sub_array = {array[0].get<double>(), array[1].get<double>()};
log_error("Count min sketch data structure number ("
+ std::to_string(i+1) + ") loaded with gamma and eps values ("
+ std::to_string(n) + ") loaded with gamma and eps values ("
+ std::to_string(sub_array[0]) + ","
+ std::to_string(sub_array[1])
+ ") equivalent to sketch dimensions ("
Expand All @@ -154,15 +159,16 @@ void anomalydetection::parse_init_config(nlohmann::json& config_json)
+ ") bytes of constant memory allocation on the heap");
m_gamma_eps.emplace_back(sub_array);
}
i++;
n++;
}
}

// If used, config JSON schema enforces a minimum of 1 items and 2-d sub-arrays
auto rows_cols_pointer = nlohmann::json::json_pointer("/count_min_sketch/rows_cols");
m_rows_cols.clear();
if (config_json.contains(rows_cols_pointer) && config_json[rows_cols_pointer].is_array())
{
int i = 0;
int n = 1;
if (config_json.contains(gamma_eps_pointer) && config_json[gamma_eps_pointer].is_array())
{
log_error("[Override Notice] Count min sketch data structures will be overriden with below settings as 'rows_cols' config overrides any previous setting");
Expand All @@ -173,7 +179,7 @@ void anomalydetection::parse_init_config(nlohmann::json& config_json)
{
std::vector<uint64_t> sub_array = {array[0].get<uint64_t>(), array[1].get<uint64_t>()};
log_error("Count min sketch data structure number ("
+ std::to_string(i+1) + ") loaded with d and w/buckets values ("
+ std::to_string(n) + ") loaded with d and w/buckets values ("
+ std::to_string(sub_array[0]) + ","
+ std::to_string(sub_array[1])
+ ") equivalent to sketch error probability and relative error tolerances ("
Expand All @@ -184,7 +190,7 @@ void anomalydetection::parse_init_config(nlohmann::json& config_json)
+ ") bytes of constant memory allocation on the heap");
m_rows_cols.emplace_back(sub_array);
}
i++;
n++;
}
}

Expand Down Expand Up @@ -215,6 +221,9 @@ void anomalydetection::parse_init_config(nlohmann::json& config_json)
supported_codes_fd_profile.end()
);

m_reset_timers.clear();
m_behavior_profiles_fields.clear();
m_behavior_profiles_event_codes.clear();
int n = 1;
for (const auto& profile : behavior_profiles)
{
Expand Down Expand Up @@ -260,6 +269,21 @@ void anomalydetection::parse_init_config(nlohmann::json& config_json)
}
}
}
if (profile.contains("reset_timer_ms"))
{
uint64_t interval = profile["reset_timer_ms"].get<uint64_t>();
if (interval > 100)
{
m_reset_timers.emplace_back(interval);
} else
{
m_reset_timers.emplace_back(uint64_t(0));
}
log_error("Behavior profile number (" + std::to_string(n) + ") resets the counts to zero every (" + std::to_string(interval) + ") ms");
} else
{
m_reset_timers.emplace_back(uint64_t(0));
}
m_behavior_profiles_fields.emplace_back(filter_check_fields);
m_behavior_profiles_event_codes.emplace_back(std::move(codes));
n++;
Expand Down Expand Up @@ -375,32 +399,41 @@ bool anomalydetection::init(falcosecurity::init_input& in)
return false;
}

//////////////////////////
////////////////////
// Init sketches
//////////////////////////
////////////////////

// Init the plugin managed state table holding the count min sketch estimates for each behavior profile
m_thread_manager.stop_threads(); // Important for reloading configs conditions
m_count_min_sketches.lock()->clear();
if (m_rows_cols.size() == m_n_sketches)
{
for (uint32_t i = 0; i < m_n_sketches; ++i)
{
uint64_t rows = m_rows_cols[i][0];
uint64_t cols = m_rows_cols[i][1];
m_count_min_sketches.lock()->push_back(std::make_unique<plugin::anomalydetection::num::cms<uint64_t>>(rows, cols));
m_count_min_sketches.lock()->push_back(std::make_shared<plugin::anomalydetection::num::cms<uint64_t>>(rows, cols));
}
} else if (m_gamma_eps.size() == m_n_sketches && m_rows_cols.empty())
{
for (uint32_t i = 0; i < m_n_sketches; ++i)
{
double gamma = m_gamma_eps[i][0];
double eps = m_gamma_eps[i][1];
m_count_min_sketches.lock()->push_back(std::make_unique<plugin::anomalydetection::num::cms<uint64_t>>(gamma, eps));
m_count_min_sketches.lock()->push_back(std::make_shared<plugin::anomalydetection::num::cms<uint64_t>>(gamma, eps));
}
} else
{
return false;
}

// Launch threads to periodically reset the data structures (if applicable)
m_thread_manager.m_stop_requested = false;
for (uint32_t i = 0; i < m_n_sketches; ++i)
{
m_thread_manager.start_periodic_count_min_sketch_reset_worker<uint64_t>(i, (uint64_t)m_reset_timers[i], m_count_min_sketches);
}

return true;
}

Expand Down
9 changes: 8 additions & 1 deletion plugins/anomalydetection/src/plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ limitations under the License.
#include "plugin_consts.h"
#include "plugin_utils.h"
#include "plugin_mutex.h"
#include "plugin_thread_manager.h"
#include "plugin_sinsp_filterchecks.h"

#include <falcosecurity/sdk.h>
Expand All @@ -45,6 +46,8 @@ struct sinsp_param
class anomalydetection
{
public:
anomalydetection() : m_thread_manager() {}

// Keep this aligned with `get_fields`
enum anomalydetection_fields
{
Expand Down Expand Up @@ -132,15 +135,19 @@ class anomalydetection

private:

// Manages plugin side threads, such as resetting the count min sketch data structures
ThreadManager m_thread_manager;

bool m_count_min_sketch_enabled = false;
uint32_t m_n_sketches = 0;
std::vector<std::vector<double>> m_gamma_eps;
std::vector<std::vector<uint64_t>> m_rows_cols; // If set supersedes m_gamma_eps
std::vector<std::vector<plugin_sinsp_filterchecks_field>> m_behavior_profiles_fields;
std::vector<std::unordered_set<ppm_event_code>> m_behavior_profiles_event_codes;
std::vector<uint64_t> m_reset_timers;

// Plugin managed state table
plugin_anomalydetection::Mutex<std::vector<std::unique_ptr<plugin::anomalydetection::num::cms<uint64_t>>>> m_count_min_sketches;
plugin_anomalydetection::Mutex<std::vector<std::shared_ptr<plugin::anomalydetection::num::cms<uint64_t>>>> m_count_min_sketches;

// required; standard plugin API
std::string m_lasterr;
Expand Down
117 changes: 117 additions & 0 deletions plugins/anomalydetection/src/plugin_thread_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// SPDX-License-Identifier: Apache-2.0
/*
Copyright (C) 2024 The Falco Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#pragma once

#include "num/cms.h"
#include "plugin_mutex.h"

#include <iostream>
#include <thread>
#include <chrono>
#include <atomic>
#include <vector>
#include <memory>

class ThreadManager {
public:
ThreadManager() : m_stop_requested(false) {}

~ThreadManager()
{
stop_threads();
}

void stop_threads()
{
{
std::lock_guard<std::mutex> lock(m_thread_mutex);
m_stop_requested = true;
}

{
std::lock_guard<std::mutex> lock(m_thread_mutex);
for (auto& t : m_threads)
{
if (t.joinable())
{
t.join();
}
}
m_threads.clear();
}
}

template<typename T>
void start_periodic_count_min_sketch_reset_worker(int id, uint64_t interval_ms, plugin_anomalydetection::Mutex<std::vector<std::shared_ptr<plugin::anomalydetection::num::cms<T>>>>& count_min_sketches)
{
if (interval_ms > 100)
{
auto worker = [id, interval_ms, &count_min_sketches, this]() {
periodic_count_min_sketch_reset_worker<T>(id, interval_ms, count_min_sketches);
};

std::thread worker_thread(worker);
{
std::lock_guard<std::mutex> lock(m_thread_mutex);
m_threads.push_back(std::move(worker_thread));
}
}
}
std::atomic<bool> m_stop_requested;

private:
std::vector<std::thread> m_threads;
std::mutex m_thread_mutex;

template<typename T>
void reset_sketches_worker(int id, plugin_anomalydetection::Mutex<std::vector<std::shared_ptr<plugin::anomalydetection::num::cms<T>>>>& count_min_sketches)
{
auto sketches = count_min_sketches.lock();
if (id >= 0 && id < sketches->size())
{
auto& sketch_ptr = sketches->at(id);
if (sketch_ptr)
{
sketch_ptr->reset();
}
}
}

template<typename T>
void periodic_count_min_sketch_reset_worker(int id, uint64_t interval_ms, plugin_anomalydetection::Mutex<std::vector<std::shared_ptr<plugin::anomalydetection::num::cms<T>>>>& count_min_sketches)
{
std::chrono::milliseconds interval(interval_ms);
while (true)
{
std::this_thread::sleep_for(interval);
{
std::lock_guard<std::mutex> lock(m_thread_mutex);
if (m_stop_requested)
break;
}

try
{
reset_sketches_worker<T>(id, count_min_sketches);
} catch (const std::exception& e)
{
}
}
}
};
2 changes: 2 additions & 0 deletions plugins/anomalydetection/src/plugin_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.

#include "plugin_utils.h"

#define SCAP_MAX_PATH_SIZE 1024

static const filtercheck_field_info sinsp_filter_check_fields[] =
{
{PT_CHARBUF, EPF_ANOMALY_PLUGIN | EPF_NONE, PF_NA, "proc.exe", "First Argument", "The first command-line argument (i.e., argv[0]), typically the executable name or a custom string as specified by the user. It is primarily obtained from syscall arguments, truncated after 4096 bytes, or, as a fallback, by reading /proc/PID/cmdline, in which case it may be truncated after 1024 bytes. This field may differ from the last component of proc.exepath, reflecting how command invocation and execution paths can vary."},
Expand Down
2 changes: 0 additions & 2 deletions plugins/anomalydetection/src/plugin_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ limitations under the License.
#include <regex>
#include <unordered_set>

#define SCAP_MAX_PATH_SIZE 1024

typedef struct plugin_sinsp_filterchecks_field
{
plugin_sinsp_filterchecks::check_type id;
Expand Down

0 comments on commit a59e689

Please sign in to comment.