Skip to content

Commit

Permalink
Setup CI for adapter with service dependencies
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Paine <[email protected]>
  • Loading branch information
timkpaine committed Sep 10, 2024
1 parent 1ce6f25 commit 21229ed
Show file tree
Hide file tree
Showing 11 changed files with 148 additions and 236 deletions.
66 changes: 63 additions & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,6 @@ jobs:
env:
CSP_TEST_SKIP_EXAMPLES: "1"


####################################################
#..................................................#
#..|########|..|########|..../####\....|########|..#
Expand Down Expand Up @@ -736,7 +735,68 @@ jobs:
###########################################################################################################
# Test Service Adapters #
###########################################################################################################
# Coming soon!
test_adapters:
needs:
- initialize
- build

strategy:
matrix:
os:
- ubuntu-24.04
python-version:
- 3.11
adapter:
- kafka

runs-on: ${{ matrix.os }}

steps:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: recursive

- name: Set up Python ${{ matrix.python-version }}
uses: ./.github/actions/setup-python
with:
version: '${{ matrix.python-version }}'

- name: Install python dependencies
run: make requirements

- name: Install test dependencies
shell: bash
run: sudo apt-get install graphviz

# Download artifact
- name: Download wheel
uses: actions/download-artifact@v4
with:
name: csp-dist-${{ runner.os }}-${{ runner.arch }}-${{ matrix.python-version }}

- name: Install wheel
run: |
python -m pip install -U *manylinux*.whl
python -m pip install -U --no-deps *manylinux*.whl --target .
- name: Spin up adapter service
run: make dockerup ADAPTER=${{ matrix.adapter }} DOCKERARGS="--wait --wait-timeout 30"

- name: Wait a few seconds after docker images have been spun up
run: sleep 30

# Run tests
- name: Setup test flags
shell: bash
run: echo "CSP_TEST_$( echo ${{ matrix.adapter }} | awk '{print toupper($0)}' )=1" >> $GITHUB_ENV

- name: Python Test Steps
run: make test-py TEST_ARGS="-k ${{ matrix.adapter }}"

- name: Spin down adapter service
run: make dockerdown ADAPTER=${{ matrix.adapter }}
if: ${{ always() }}

############################################################################################
#..........................................................................................#
Expand All @@ -751,7 +811,6 @@ jobs:
############################################################################################
# Upload Release Artifacts #
############################################################################################

# only publish artifacts on tags, but otherwise this always runs
# Note this whole workflow only triggers on release tags (e.g. "v0.1.0")
publish_release_artifacts:
Expand All @@ -763,6 +822,7 @@ jobs:
- test
- test_sdist
- test_dependencies
- test_adapters

if: startsWith(github.ref, 'refs/tags/v')
runs-on: ubuntu-24.04
Expand Down
13 changes: 7 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -105,21 +105,22 @@ tests: test

.PHONY: dockerup dockerps dockerdown initpodmanmac
ADAPTER := kafka
DOCKER := podman
DOCKER_COMPOSE := docker compose # or podman-compose
DOCKERARGS :=

initpodmanmac:
podman machine stop
podman machine set --cpus 4 --memory 8096
podman machine start

dockerup: ## spin up docker compose services for adapter testing
$(DOCKER) compose -f ci/$(ADAPTER)/docker-compose.yml up -d
$(DOCKER_COMPOSE) -f ci/$(ADAPTER)/docker-compose.yml up -d $(DOCKERARGS)

dockerps: ## spin up docker compose services for adapter testing
$(DOCKER) compose -f ci/$(ADAPTER)/docker-compose.yml ps
dockerps: ## get status of current docker compose services
$(DOCKER_COMPOSE) -f ci/$(ADAPTER)/docker-compose.yml ps

dockerdown: ## spin up docker compose services for adapter testing
$(DOCKER) compose -f ci/$(ADAPTER)/docker-compose.yml down
dockerdown: ## spin down docker compose services for adapter testing
$(DOCKER_COMPOSE) -f ci/$(ADAPTER)/docker-compose.yml down

###########
# VERSION #
Expand Down
193 changes: 26 additions & 167 deletions ci/kafka/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,181 +1,40 @@
# https://docs.confluent.io/platform/current/platform-quickstart.html
# https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.5.3-post/cp-all-in-one-kraft/docker-compose.yml
# https://github.com/conduktor/kafka-stack-docker-compose
---
version: '2'
version: '2.1'

services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.3
hostname: zookeeper
container_name: zookeeper
zoo1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888

broker:
image: confluentinc/cp-server:7.5.3
hostname: broker
container_name: broker
depends_on:
- zookeeper
kafka1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "9101:9101"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
TOPIC_AUTO_CREATE: 'true'

schema-registry:
image: confluentinc/cp-schema-registry:7.5.3
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

connect:
image: cnfldemos/cp-server-connect-datagen:0.6.2-7.5.0
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.3.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

control-center:
image: confluentinc/cp-enterprise-control-center:7.5.3
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
- ksqldb-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021

ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.5.3
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'

# ksqldb-cli:
# image: confluentinc/cp-ksqldb-cli:7.5.3
# container_name: ksqldb-cli
# depends_on:
# - broker
# - connect
# - ksqldb-server
# entrypoint: /bin/sh
# tty: true

# ksql-datagen:
# image: confluentinc/ksqldb-examples:7.5.3
# hostname: ksql-datagen
# container_name: ksql-datagen
# depends_on:
# - ksqldb-server
# - broker
# - schema-registry
# - connect
# command: "bash -c 'echo Waiting for Kafka to be ready... && \
# cub kafka-ready -b broker:29092 1 40 && \
# echo Waiting for Confluent Schema Registry to be ready... && \
# cub sr-ready schema-registry 8081 40 && \
# echo Waiting a few seconds for topic creation to finish... && \
# sleep 11 && \
# tail -f /dev/null'"
# environment:
# KSQL_CONFIG_DIR: "/etc/ksql"
# STREAMS_BOOTSTRAP_SERVERS: broker:29092
# STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
# STREAMS_SCHEMA_REGISTRY_PORT: 8081

rest-proxy:
image: confluentinc/cp-kafka-rest:7.5.3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
- zoo1
1 change: 0 additions & 1 deletion conda/dev-environment-unix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ dependencies:
- flex
- graphviz
- gtest
- httpx>=0.20,<1
- isort>=5,<6
- libarrow=16
- libboost>=1.80.0
Expand Down
1 change: 0 additions & 1 deletion conda/dev-environment-win.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ dependencies:
- exprtk
- graphviz
- gtest
- httpx>=0.20,<1
- isort>=5,<6
- libarrow=16
- libboost>=1.80.0
Expand Down
14 changes: 4 additions & 10 deletions csp/adapters/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,12 @@
import csp
from csp import ts
from csp.adapters.status import Status
from csp.adapters.utils import (
BytesMessageProtoMapper,
DateTimeType,
JSONTextMessageMapper,
MsgMapper,
RawBytesMessageMapper,
RawTextMessageMapper,
)
from csp.adapters.utils import MsgMapper
from csp.impl.wiring import input_adapter_def, output_adapter_def, status_adapter_def
from csp.lib import _kafkaadapterimpl

_ = BytesMessageProtoMapper, DateTimeType, JSONTextMessageMapper, RawBytesMessageMapper, RawTextMessageMapper
__all__ = ("KafkaStatusMessageType", "KafkaStartOffset", "KafkaAdapterManager")

T = TypeVar("T")


Expand Down Expand Up @@ -73,7 +67,7 @@ def __init__(

consumer_properties = {
"group.id": group_id,
# To get end of parition notification for live / not live flag
# To get end of partition notification for live / not live flag
"enable.partition.eof": "true",
}

Expand Down
5 changes: 2 additions & 3 deletions csp/tests/adapters/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@

@pytest.fixture(scope="module", autouse=True)
def kafkabroker():
# Defined in ci/kafka/docker-compose.yml
return "localhost:9092"


@pytest.fixture(scope="module", autouse=True)
def kafkaadapter(kafkabroker):
group_id = "group.id123"
_kafkaadapter = KafkaAdapterManager(
broker=kafkabroker, group_id=group_id, rd_kafka_conf_options={"allow.auto.create.topics": "true"}
)
_kafkaadapter = KafkaAdapterManager(broker=kafkabroker, group_id=group_id)
return _kafkaadapter
Loading

0 comments on commit 21229ed

Please sign in to comment.