Skip to content

Commit

Permalink
Add GetLinuxHeadersStatus UDTF and update GetAgentStatus UDTF to incl…
Browse files Browse the repository at this point in the history
…ude kernel_headers_installed

Signed-off-by: Dom Del Nano <[email protected]>
  • Loading branch information
ddelnano committed Dec 2, 2024
1 parent f817a59 commit 779cdf4
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 15 deletions.
2 changes: 2 additions & 0 deletions src/vizier/funcs/md_udtfs/md_udtfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ void RegisterFuncsOrDie(const VizierFuncFactoryContext& ctx, carnot::udf::Regist
registry->RegisterFactoryOrDie<GetProfilerSamplingPeriodMS,
UDTFWithMDFactory<GetProfilerSamplingPeriodMS>>(
"GetProfilerSamplingPeriodMS", ctx);
registry->RegisterFactoryOrDie<GetLinuxHeadersStatus, UDTFWithMDFactory<GetLinuxHeadersStatus>>(
"GetLinuxHeadersStatus", ctx);

registry->RegisterOrDie<GetDebugMDState>("_DebugMDState");
registry->RegisterFactoryOrDie<GetDebugMDWithPrefix, UDTFWithMDFactory<GetDebugMDWithPrefix>>(
Expand Down
64 changes: 63 additions & 1 deletion src/vizier/funcs/md_udtfs/md_udtfs_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ namespace px {
namespace vizier {
namespace funcs {
namespace md {

constexpr std::string_view kKernelHeadersInstalledDesc =
"Whether the agent had linux headers pre-installed";

template <typename TUDTF>
class UDTFWithMDFactory : public carnot::udf::UDTFFactory {
public:
Expand Down Expand Up @@ -295,7 +299,9 @@ class GetAgentStatus final : public carnot::udf::UDTF<GetAgentStatus> {
ColInfo("create_time", types::DataType::TIME64NS, types::PatternType::GENERAL,
"The creation time of the agent"),
ColInfo("last_heartbeat_ns", types::DataType::INT64, types::PatternType::GENERAL,
"Time (in nanoseconds) since the last heartbeat"));
"Time (in nanoseconds) since the last heartbeat"),
ColInfo("kernel_headers_installed", types::DataType::BOOLEAN, types::PatternType::GENERAL,
kKernelHeadersInstalledDesc));
}

Status Init(FunctionContext*) {
Expand Down Expand Up @@ -330,6 +336,8 @@ class GetAgentStatus final : public carnot::udf::UDTF<GetAgentStatus> {
rw->Append<IndexOf("agent_state")>(StringValue(magic_enum::enum_name(agent_status.state())));
rw->Append<IndexOf("create_time")>(agent_info.create_time_ns());
rw->Append<IndexOf("last_heartbeat_ns")>(agent_status.ns_since_last_heartbeat());
rw->Append<IndexOf("kernel_headers_installed")>(
agent_info.info().host_info().kernel_headers_installed());

++idx_;
return idx_ < resp_->info_size();
Expand Down Expand Up @@ -396,6 +404,60 @@ class GetProfilerSamplingPeriodMS final : public carnot::udf::UDTF<GetProfilerSa
std::function<void(grpc::ClientContext*)> add_context_authentication_func_;
};

/**
* This UDTF retrieves the status of the agents' Linux headers installation.
*/
class GetLinuxHeadersStatus final : public carnot::udf::UDTF<GetLinuxHeadersStatus> {
public:
using MDSStub = vizier::services::metadata::MetadataService::Stub;
using SchemaResponse = vizier::services::metadata::SchemaResponse;
GetLinuxHeadersStatus() = delete;
GetLinuxHeadersStatus(std::shared_ptr<MDSStub> stub,
std::function<void(grpc::ClientContext*)> add_context_authentication)
: idx_(0), stub_(stub), add_context_authentication_func_(add_context_authentication) {}

static constexpr auto Executor() { return carnot::udfspb::UDTFSourceExecutor::UDTF_ONE_KELVIN; }

static constexpr auto OutputRelation() {
return MakeArray(
ColInfo("asid", types::DataType::INT64, types::PatternType::GENERAL, "The Agent Short ID"),
ColInfo("kernel_headers_installed", types::DataType::BOOLEAN, types::PatternType::GENERAL,
kKernelHeadersInstalledDesc));
}

Status Init(FunctionContext*) {
px::vizier::services::metadata::AgentInfoRequest req;
resp_ = std::make_unique<px::vizier::services::metadata::AgentInfoResponse>();

grpc::ClientContext ctx;
add_context_authentication_func_(&ctx);
auto s = stub_->GetAgentInfo(&ctx, req, resp_.get());
if (!s.ok()) {
return error::Internal("Failed to make RPC call to GetAgentInfo");
}
return Status::OK();
}

bool NextRecord(FunctionContext*, RecordWriter* rw) {
const auto& agent_metadata = resp_->info(idx_);
const auto& agent_info = agent_metadata.agent();

const auto asid = agent_info.asid();
const auto kernel_headers_installed = agent_info.info().host_info().kernel_headers_installed();
rw->Append<IndexOf("asid")>(asid);
rw->Append<IndexOf("kernel_headers_installed")>(kernel_headers_installed);

++idx_;
return idx_ < resp_->info_size();
}

private:
int idx_ = 0;
std::unique_ptr<px::vizier::services::metadata::AgentInfoResponse> resp_;
std::shared_ptr<MDSStub> stub_;
std::function<void(grpc::ClientContext*)> add_context_authentication_func_;
};

namespace internal {
inline rapidjson::GenericStringRef<char> StringRef(std::string_view s) {
return rapidjson::GenericStringRef<char>(s.data(), s.size());
Expand Down
6 changes: 3 additions & 3 deletions src/vizier/services/agent/kelvin/kelvin_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ int main(int argc, char** argv) {
LOG(INFO) << absl::Substitute("Pixie Kelvin. Version: $0, id: $1, kernel: $2",
px::VersionInfo::VersionString(), agent_id.str(),
kernel_version.ToString());
auto manager = KelvinManager::Create(agent_id, FLAGS_pod_name, FLAGS_host_ip, addr,
FLAGS_rpc_port, FLAGS_nats_url, mds_addr, kernel_version)
.ConsumeValueOrDie();
auto manager =
KelvinManager::Create(agent_id, FLAGS_pod_name, FLAGS_host_ip, addr, FLAGS_rpc_port,
FLAGS_nats_url, mds_addr, kernel_version, /* kernel_headers_installed */ false)

TerminationHandler::set_manager(manager.get());

Expand Down
6 changes: 4 additions & 2 deletions src/vizier/services/agent/kelvin/kelvin_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ class KelvinManager : public Manager {
KelvinManager() = delete;
KelvinManager(sole::uuid agent_id, std::string_view pod_name, std::string_view host_ip,
std::string_view addr, int grpc_server_port, std::string_view nats_url,
std::string_view mds_url, system::KernelVersion kernel_version)
std::string_view mds_url, system::KernelVersion kernel_version,
bool kernel_headers_installed)
: Manager(agent_id, pod_name, host_ip, grpc_server_port, KelvinManager::Capabilities(),
KelvinManager::Parameters(), nats_url, mds_url, kernel_version) {
KelvinManager::Parameters(), nats_url, mds_url, kernel_version,
kernel_headers_installed) {
info()->address = std::string(addr);
}

Expand Down
20 changes: 17 additions & 3 deletions src/vizier/services/agent/pem/pem_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "src/common/base/base.h"
#include "src/common/signal/signal.h"
#include "src/common/system/kernel_version.h"
#include "src/common/system/linux_headers_utils.h"
#include "src/shared/version/version.h"

DEFINE_string(nats_url, gflags::StringFromEnv("PL_NATS_URL", "pl-nats"),
Expand All @@ -41,6 +42,8 @@ using ::px::vizier::agent::DefaultDeathHandler;
using ::px::vizier::agent::PEMManager;
using ::px::vizier::agent::TerminationHandler;

constexpr std::string_view kLinuxHeadersPath = "/lib/modules";

int main(int argc, char** argv) {
px::EnvironmentGuard env_guard(&argc, argv);

Expand Down Expand Up @@ -68,9 +71,20 @@ int main(int argc, char** argv) {
LOG(INFO) << absl::Substitute("Pixie PEM. Version: $0, id: $1, kernel version: $2",
px::VersionInfo::VersionString(), agent_id.str(),
kernel_version.ToString());
auto manager =
PEMManager::Create(agent_id, FLAGS_pod_name, FLAGS_host_ip, FLAGS_nats_url, kernel_version)
.ConsumeValueOrDie();

auto kernel_headers_installed = false;
auto uname = px::system::GetUname();
if (uname.ok()) {
const auto host_path = px::system::Config::GetInstance().ToHostPath(absl::Substitute("$0/$1/$2", kLinuxHeadersPath, uname.ConsumeValueOrDie(), "build"));

const auto resolved_host_path = px::system::ResolvePossibleSymlinkToHostPath(host_path);
LOG(WARNING) << "Resolved host path: " << resolved_host_path.ToString();
kernel_headers_installed = resolved_host_path.ok();
}

auto manager = PEMManager::Create(agent_id, FLAGS_pod_name, FLAGS_host_ip, FLAGS_nats_url,
kernel_version, kernel_headers_installed)
.ConsumeValueOrDie();

TerminationHandler::set_manager(manager.get());

Expand Down
9 changes: 5 additions & 4 deletions src/vizier/services/agent/pem/pem_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,19 @@ class PEMManager : public Manager {
protected:
PEMManager() = delete;
PEMManager(sole::uuid agent_id, std::string_view pod_name, std::string_view host_ip,
std::string_view nats_url, px::system::KernelVersion kernel_version)
std::string_view nats_url, px::system::KernelVersion kernel_version,
bool kernel_headers_installed)
: PEMManager(agent_id, pod_name, host_ip, nats_url,
px::stirling::Stirling::Create(px::stirling::CreateSourceRegistryFromFlag()),
kernel_version) {}
kernel_version, kernel_headers_installed) {}

// Constructor which creates the HostInfo for an agent (runs once per node).
PEMManager(sole::uuid agent_id, std::string_view pod_name, std::string_view host_ip,
std::string_view nats_url, std::unique_ptr<stirling::Stirling> stirling,
px::system::KernelVersion kernel_version)
px::system::KernelVersion kernel_version, bool kernel_headers_installed)
: Manager(agent_id, pod_name, host_ip, /*grpc_server_port*/ 0, PEMManager::Capabilities(),
PEMManager::Parameters(), nats_url,
/*mds_url*/ "", kernel_version),
/*mds_url*/ "", kernel_version, kernel_headers_installed),
stirling_(std::move(stirling)),
node_available_memory_(prometheus::BuildGauge()
.Name("node_available_memory")
Expand Down
1 change: 1 addition & 0 deletions src/vizier/services/agent/shared/base/info.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ struct Info {
std::string pod_name;
std::string host_ip;
system::KernelVersion kernel_version;
bool kernel_headers_installed = false;
services::shared::agent::AgentCapabilities capabilities;
services::shared::agent::AgentParameters parameters;
};
Expand Down
4 changes: 3 additions & 1 deletion src/vizier/services/agent/shared/manager/manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ std::shared_ptr<services::metadata::CronScriptStoreService::Stub> CreateCronScri
Manager::Manager(sole::uuid agent_id, std::string_view pod_name, std::string_view host_ip,
int grpc_server_port, services::shared::agent::AgentCapabilities capabilities,
services::shared::agent::AgentParameters parameters, std::string_view nats_url,
std::string_view mds_url, system::KernelVersion kernel_version)
std::string_view mds_url, system::KernelVersion kernel_version,
bool kernel_headers_installed)
: grpc_channel_creds_(SSL::DefaultGRPCClientCreds()),
time_system_(std::make_unique<px::event::RealTimeSystem>()),
api_(std::make_unique<px::event::APIImpl>(time_system_.get())),
Expand Down Expand Up @@ -135,6 +136,7 @@ Manager::Manager(sole::uuid agent_id, std::string_view pod_name, std::string_vie
info_.pod_name = std::string(pod_name);
info_.host_ip = std::string(host_ip);
info_.kernel_version = kernel_version;
info_.kernel_headers_installed = kernel_headers_installed;
}

Status Manager::Init() {
Expand Down
3 changes: 2 additions & 1 deletion src/vizier/services/agent/shared/manager/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ class Manager : public BaseManager {
Manager(sole::uuid agent_id, std::string_view pod_name, std::string_view host_ip,
int grpc_server_port, services::shared::agent::AgentCapabilities capabilities,
services::shared::agent::AgentParameters parameters, std::string_view nats_url,
std::string_view mds_url, system::KernelVersion kernel_version);
std::string_view mds_url, system::KernelVersion kernel_version,
bool kernel_headers_installed);
Status Init();

Status RegisterMessageHandler(MsgCase c, std::shared_ptr<MessageHandler> handler,
Expand Down
1 change: 1 addition & 0 deletions src/vizier/services/agent/shared/manager/registration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ Status RegistrationHandler::DispatchRegistration() {
host_info->set_host_ip(agent_info()->host_ip);
auto kernel_version_proto = KernelToProto(agent_info()->kernel_version);
host_info->mutable_kernel()->CopyFrom(kernel_version_proto);
host_info->set_kernel_headers_installed(agent_info()->kernel_headers_installed);
*req_info->mutable_capabilities() = agent_info()->capabilities;
*req_info->mutable_parameters() = agent_info()->parameters;

Expand Down
2 changes: 2 additions & 0 deletions src/vizier/services/shared/agentpb/agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ message HostInfo {
string host_ip = 3 [ (gogoproto.customname) = "HostIP" ];
// Version of the kernel running on the host.
KernelVersion kernel = 4;
// Whether kernel headers were preinstalled on the host.
bool kernel_headers_installed = 5;
}

// Agent contains information about a specific agent instance.
Expand Down

0 comments on commit 779cdf4

Please sign in to comment.