Skip to content

Commit

Permalink
ADD AMQP bpf test setup
Browse files Browse the repository at this point in the history
Summary: Creates a demo app running rabbitmq to test amqp bpf

Test Plan: This can be tested via the full BPF test in another PR

Reviewers: #third_party_approvers, #stirling, yzhao, vihang

Reviewed By: #third_party_approvers, #stirling, yzhao, vihang

Subscribers: vihang, rcheng, yzhao

Signed-off-by: Vikranth Srivatsa <[email protected]>

Differential Revision: https://phab.corp.pixielabs.ai/D11791

GitOrigin-RevId: 8045746
  • Loading branch information
vikranth22446 authored and copybaranaut committed Aug 1, 2022
1 parent 9aa64f2 commit 8966ea0
Show file tree
Hide file tree
Showing 11 changed files with 315 additions and 2 deletions.
21 changes: 19 additions & 2 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,28 @@ go_download_sdk(
version = "1.17.11",
)


pip_parse(
name = "amqp_gen_reqs",
requirements_lock = "//src/stirling/source_connectors/socket_tracer/protocols/amqp/amqp_code_generator:requirements.txt",
)

load("@amqp_gen_reqs//:requirements.bzl", "install_deps")
load("@amqp_gen_reqs//:requirements.bzl", amp_gen_install_deps = "install_deps")

amp_gen_install_deps()

load(
"@io_bazel_rules_docker//python3:image.bzl",
py_image_repos = "repositories",
)

py_image_repos()

pip_parse(
name = "amqp_bpf_test_requirements",
requirements_lock = "//src/stirling/source_connectors/socket_tracer/testing/containers/amqp:requirements.txt",
)

load("@amqp_bpf_test_requirements//:requirements.bzl", ampq_bpf_test_install_deps = "install_deps")

install_deps()
ampq_bpf_test_install_deps()
8 changes: 8 additions & 0 deletions bazel/container_images.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,11 @@ def stirling_test_images():
"sha256:b0616a801a0f3c17c437c67c49e20c76c8735e205cdc165e56ae4fa867f32af1",
"pixie-oss/pixie-dev-public/docker-deps/node",
)

# Tag: rabbitmq:3-management
# Arch: linux/amd64
_gcr_io_image(
"rabbitmq_3_management",
"sha256:650c7e0093842739ddfaadec0d45946c052dba42941bd5c0a082cbe914451c25",
"pixie-oss/demo-apps/rabbitmq/rabbitmq:3-management",
)
13 changes: 13 additions & 0 deletions src/stirling/source_connectors/socket_tracer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -484,3 +484,16 @@ pl_cc_test(
"//src/stirling/testing:cc_library",
],
)

pl_cc_test(
name = "amqp_trace_bpf_test",
timeout = "moderate",
srcs = ["amqp_bpf_test.cc"],
tags = ["requires_bpf"],
deps = [
":cc_library",
"//src/common/testing/test_utils:cc_library",
"//src/stirling/source_connectors/socket_tracer/testing:cc_library",
"//src/stirling/source_connectors/socket_tracer/testing:container_images",
],
)
100 changes: 100 additions & 0 deletions src/stirling/source_connectors/socket_tracer/amqp_bpf_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2018- The Pixie Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include <regex>
#include <string>

#include <absl/strings/str_replace.h>

#include "src/common/base/base.h"
#include "src/common/exec/exec.h"
#include "src/common/testing/test_utils/container_runner.h"
#include "src/common/testing/testing.h"
#include "src/stirling/source_connectors/socket_tracer/testing/container_images.h"
#include "src/stirling/source_connectors/socket_tracer/testing/socket_trace_bpf_test_fixture.h"
#include "src/stirling/testing/common.h"

namespace px {
namespace stirling {

using ::px::stirling::testing::FindRecordIdxMatchesPID;
using ::px::stirling::testing::SocketTraceBPFTestFixture;
using ::px::testing::BazelRunfilePath;
using ::testing::AllOf;
using ::testing::Contains;
using ::testing::Eq;
using ::testing::Field;
using ::testing::HasSubstr;
using ::testing::StrEq;
using ::px::operator<<;

class AMQPTraceTest : public SocketTraceBPFTestFixture</* TClientSideTracing */ true> {
protected:
StatusOr<int32_t> GetPIDFromOutput(std::string_view out) {
std::vector<std::string_view> lines = absl::StrSplit(out, "\n");
if (lines.empty()) {
return error::Internal("Executed output (pid) from command.");
}

int32_t client_pid;
if (!absl::SimpleAtoi(lines[0], &client_pid)) {
return error::Internal("Could not extract PID.");
}

return client_pid;
}

void RunAll() {
rabbitmq_server_.Run(std::chrono::seconds{120});
rabbitmq_consumer_.Run(
std::chrono::seconds{120},
{absl::Substitute("--network=container:$0", rabbitmq_server_.container_name())});

rabbitmq_producer_.Run(
std::chrono::seconds{120},
{absl::Substitute("--network=container:$0", rabbitmq_server_.container_name())});
}

::px::stirling::testing::RabbitMQConsumer rabbitmq_consumer_;
::px::stirling::testing::RabbitMQProducer rabbitmq_producer_;
::px::stirling::testing::RabbitMQContainer rabbitmq_server_;
};

struct AMQPTraceRecord {
int64_t ts_ns = 0;
std::string frame_type;
std::string class_id;
std::string method_id;
std::string channel;

std::string ToString() const {
return absl::Substitute("ts_ns=$0 frame_type=$1 class_id=$2 method_id=$3 channel=$4", ts_ns,
frame_type, class_id, method_id, channel);
}
};

TEST_F(AMQPTraceTest, AMQPCapture) {
StartTransferDataThread();
RunAll();
StopTransferDataThread();
}

} // namespace stirling
} // namespace px
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pl_cc_test_library(
hdrs = ["container_images.h"],
data = [
"//src/stirling/source_connectors/socket_tracer/protocols/nats/testing:nats_test_client_with_ca_image.tar",
"//src/stirling/source_connectors/socket_tracer/testing/containers:amqp_image.tar",
"//src/stirling/source_connectors/socket_tracer/testing/containers:curl_image.tar",
"//src/stirling/source_connectors/socket_tracer/testing/containers:datastax_image.tar",
"//src/stirling/source_connectors/socket_tracer/testing/containers:dns_image.tar",
Expand All @@ -58,6 +59,8 @@ pl_cc_test_library(
"//src/stirling/source_connectors/socket_tracer/testing/containers:redis_image.tar",
"//src/stirling/source_connectors/socket_tracer/testing/containers:ruby_image.tar",
"//src/stirling/source_connectors/socket_tracer/testing/containers:zookeeper_image.tar",
"//src/stirling/source_connectors/socket_tracer/testing/containers/amqp:consumer_image.tar",
"//src/stirling/source_connectors/socket_tracer/testing/containers/amqp:producer_image.tar",
"//src/stirling/source_connectors/socket_tracer/testing/containers/pgsql:demo_client_image.tar",
"//src/stirling/source_connectors/socket_tracer/testing/containers/thriftmux:server_image.tar",
"//src/stirling/testing/demo_apps/go_grpc_tls_pl/client:golang_1_16_grpc_tls_client.tar",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,49 @@ class RedisContainer : public ContainerRunner {
static constexpr std::string_view kReadyMessage = "# Server initialized";
};

//-----------------------------------------------------------------------------
// RabbitMQ
//-----------------------------------------------------------------------------

class RabbitMQContainer : public ContainerRunner {
public:
RabbitMQContainer()
: ContainerRunner(::px::testing::BazelRunfilePath(kBazelImageTar), kContainerNamePrefix,
kReadyMessage) {}

private:
static constexpr std::string_view kBazelImageTar =
"src/stirling/source_connectors/socket_tracer/testing/containers/amqp_image.tar";
static constexpr std::string_view kReadyMessage = "Server startup complete";
static constexpr std::string_view kContainerNamePrefix = "amqp_server";
};

class RabbitMQProducer : public ContainerRunner {
public:
RabbitMQProducer()
: ContainerRunner(::px::testing::BazelRunfilePath(kBazelImageTar), kContainerNamePrefix,
kReadyMessage) {}

private:
static constexpr std::string_view kBazelImageTar =
"src/stirling/source_connectors/socket_tracer/testing/containers/amqp/producer_image.tar";
static constexpr std::string_view kReadyMessage = "Starting AMQP producer";
static constexpr std::string_view kContainerNamePrefix = "amqp_producer";
};

class RabbitMQConsumer : public ContainerRunner {
public:
RabbitMQConsumer()
: ContainerRunner(::px::testing::BazelRunfilePath(kBazelImageTar), kContainerNamePrefix,
kReadyMessage) {}

private:
static constexpr std::string_view kBazelImageTar =
"src/stirling/source_connectors/socket_tracer/testing/containers/amqp/consumer_image.tar";
static constexpr std::string_view kReadyMessage = "Starting AMQP consumer";
static constexpr std::string_view kContainerNamePrefix = "amqp_consumer";
};

//-----------------------------------------------------------------------------
// Cassandra
//-----------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,8 @@ node_http_server_image(
name = "node_14_18_1_alpine_image",
base = "@node_14_18_1_alpine_amd64_image//image",
)

container_image(
name = "amqp_image",
base = "@rabbitmq_3_management//image",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright 2018- The Pixie Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0

load("@amqp_bpf_test_requirements//:requirements.bzl", "requirement")
load("@io_bazel_rules_docker//python3:image.bzl", "py3_image")

package(default_visibility = ["//src/stirling:__subpackages__"])

py3_image(
name = "consumer_image",
srcs = ["consumer.py"],
main = "consumer.py",
deps = [
requirement("pika"),
],
)

py3_image(
name = "producer_image",
srcs = ["producer.py"],
main = "producer.py",
deps = [
requirement("pika"),
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2018- The Pixie Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0

import pika
import sys
import os

connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))


def main(queue_name="queue_1"):
channel = connection.channel()
channel.queue_declare(queue=queue_name)

def callback(ch, method, properties, body):
print("Received message")
print(" [x] Received %r" % body)

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print("Starting AMQP consumer", file=sys.stderr)
print(" [*] Waiting for messages. To exit press CTRL+C")
channel.start_consuming()


def main_handler(queue_name):
try:
main(queue_name)
except KeyboardInterrupt:
print("Interrupted")
try:
sys.exit(0)
except SystemExit:
os._exit(0)


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright 2018- The Pixie Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0

import pika
import time
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()


channel.queue_declare(queue="queue_1")
print("Starting AMQP producer", file=sys.stderr)

try:
while True:
new_body = "hello"
channel.basic_publish(exchange="", routing_key="queue_1", body=new_body)
time.sleep(1)
print("Completed")
finally:
connection.close()
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pika==1.3.0

0 comments on commit 8966ea0

Please sign in to comment.