Skip to content

Commit

Permalink
Merge pull request #46 from adjust/pad_336-bootstrap_servers
Browse files Browse the repository at this point in the history
`rd_kafka_brokers_add()` was deprecated:
confluentinc/librdkafka#3211

Instead of `rd_kafka_brokers_add()` it is necessary to use `rd_kafka_conf_set()`:
https://github.com/confluentinc/librdkafka/blob/49f05db36e5bff78e856e33da951a5f998f9d55b/examples/rdkafka_complex_consumer_example.c#L494
  • Loading branch information
za-arthur authored Aug 7, 2023
2 parents 8c1f273 + 30b2777 commit 2615504
Show file tree
Hide file tree
Showing 11 changed files with 56 additions and 38 deletions.
13 changes: 7 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,23 @@ name: CI

on:
push:
branches: ['*']
branches:
- master
- main
pull_request:
branches: ['*']

jobs:
test:
strategy:
fail-fast: false
matrix:
pg: [14, 13, 12, 11, 10, 9.6, 9.5]
pg: [15, 14, 13, 12, 11, 10]
name: PostgreSQL ${{ matrix.pg }}
runs-on: ubuntu-latest
container: zilder/pg-ext-check
container: pgxn/pgxn-tools
steps:
# Install and run postgres
- run: pg-setup ${{ matrix.pg }}
- run: pg-start ${{ matrix.pg }}

# Install packages
- run: mkdir -p /usr/share/man/man1
Expand All @@ -32,7 +33,7 @@ jobs:
working-directory: ./test

# Build kafka_fdw and run tests
- run: build-check
- run: pg-build-test
env:
KAFKA_PRODUCER: "/kafka/bin/kafka-console-producer.sh"
KAFKA_TOPICS: "/kafka/bin/kafka-topics.sh"
21 changes: 17 additions & 4 deletions .github/workflows/ci_dockerfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,33 @@ name: build

on:
push:
branches: ['*']
branches:
- master
- main
pull_request:
branches: ['*']

jobs:
test:
strategy:
fail-fast: false
matrix:
pg: [14, 13, 12, 11, 10, 9.6]
include:
- clang: 15
pg: 15
- clang: 15
pg: 14
- clang: 15
pg: 13
- clang: 15
pg: 12
- clang: 15
pg: 11
- clang: none
pg: 10
name: PostgreSQL ${{ matrix.pg }}
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- run: docker build --tag tests --build-arg="PG_VERSION=${{ matrix.pg }}" .
- run: docker build --tag tests --build-arg="PG_VERSION=${{ matrix.pg }}" --target="clang-${{ matrix.clang }}" .
- run: docker run --rm tests

27 changes: 18 additions & 9 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
ARG PG_VERSION
FROM postgres:${PG_VERSION}-alpine
FROM postgres:${PG_VERSION}-alpine AS base

# Environment
ENV LANG=C.UTF-8
ENV REPO=/repo
ENV KAFKA_PRODUCER="/kafka/bin/kafka-console-producer.sh"
ENV KAFKA_TOPICS="/kafka/bin/kafka-topics.sh"

# Install dependencies
RUN apk --no-cache add make musl-dev gcc clang llvm util-linux-dev wget librdkafka-dev openjdk8-jre;

# Make postgres directories writable
RUN chmod a+rwx /usr/local/lib/postgresql && \
chmod a+rwx /usr/local/lib/postgresql/bitcode || true && \
chmod a+rwx /usr/local/share/postgresql/extension

chmod a+rwx /usr/local/share/postgresql/extension && \
# Make directories
RUN mkdir -p $REPO && \
mkdir -p $REPO && \
mkdir /kafka && \
chmod a+rwx /kafka

Expand All @@ -25,10 +21,23 @@ ADD . $REPO
RUN chown -R postgres:postgres $REPO
WORKDIR $REPO

USER postgres

# Expose zookeeper and kafka ports (may be useful for local debug)
EXPOSE 2181 9092

# clang-none target
FROM base AS clang-none

# Install dependencies
RUN apk --no-cache add make musl-dev gcc util-linux-dev wget librdkafka-dev openjdk8-jre;
USER postgres

ENTRYPOINT ["/repo/test/run_tests.sh"]

# clang-15 target
FROM base AS clang-15

# Install dependencies
RUN apk --no-cache add make musl-dev gcc clang15 llvm15 util-linux-dev wget librdkafka-dev openjdk8-jre;
USER postgres

ENTRYPOINT ["/repo/test/run_tests.sh"]
7 changes: 0 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ ifeq ($(shell test $(VERSION_NUM) -lt 100000; echo $$?),0)
REGRESS := $(filter-out parallel, $(REGRESS))
endif

ifeq ($(shell test $(VERSION_NUM) -ge 90600; echo $$?),0)
PGOPTIONS+= "--max_parallel_workers_per_gather=0"
endif


PLATFORM = $(shell uname -s)

Expand All @@ -53,9 +49,6 @@ all: $(EXTENSION)--$(EXTVERSION).sql
$(EXTENSION)--$(EXTVERSION).sql: sql/$(EXTENSION).sql
cp $< $@

installcheck: submake $(REGRESS_PREP)
PGOPTIONS=$(PGOPTIONS) $(pg_regress_installcheck) $(REGRESS_OPTS) $(REGRESS)

prep_kafka:
./test/init_kafka.sh

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ see `test/init_kafka.sh`

## Usage

CREATE SERVER must specifie a brokerlist using option `brokers`
CREATE SERVER must specify a brokerlist using option `brokers`
```SQL
CREATE SERVER kafka_server
FOREIGN DATA WRAPPER kafka_fdw
Expand Down
13 changes: 4 additions & 9 deletions src/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,14 @@ KafkaFdwGetConnection(KafkaOptions *k_options,

conf = rd_kafka_conf_new();

if (rd_kafka_conf_set(conf, "bootstrap.servers", k_options->brokers,
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
elog(ERROR, "%s\n", errstr);

*kafka_handle = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, KAFKA_MAX_ERR_MSG);

if (*kafka_handle != NULL)
{
/* Add brokers */
/* Check if exactly 1 broker was added */
if (rd_kafka_brokers_add(*kafka_handle, k_options->brokers) < 1)
{
rd_kafka_destroy(*kafka_handle);
elog(ERROR, "No valid brokers specified");
*kafka_handle = NULL;
}

/* Create topic handle */
topic_conf = rd_kafka_topic_conf_new();

Expand Down
1 change: 1 addition & 0 deletions test/expected/kafka_test.out
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
\i test/sql/setup.inc
\set ECHO none
SET max_parallel_workers_per_gather = 0;
/*
* Returns EXPLAIN ANALYZE result without any arbitrary numbers like costs
* or execution time.
Expand Down
2 changes: 2 additions & 0 deletions test/expected/parallel.out
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ ALTER FOREIGN TABLE kafka_test_part OPTIONS(ADD num_partitions '4');
set max_parallel_workers_per_gather=2;
set max_parallel_workers=8;
set parallel_setup_cost=0;
set parallel_tuple_cost=0;
set min_parallel_table_scan_size=0;
ANALYZE kafka_test_part;
EXPLAIN (COSTS OFF) SELECT * FROM kafka_test_part ;
QUERY PLAN
Expand Down
4 changes: 2 additions & 2 deletions test/run_kafka.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
#
set -ex

KAFKA_VERSION=2.8.1
KAFKA_VERSION=2.8.2
KAFKA_ARCHIVE=kafka_2.13-${KAFKA_VERSION}.tgz

DIST=$(cat /etc/os-release | grep ^ID= | sed s/ID=//)

echo

# Download Apache Kafka
wget https://downloads.apache.org/kafka/${KAFKA_VERSION}/${KAFKA_ARCHIVE}
wget https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/${KAFKA_ARCHIVE}
tar -xzf ${KAFKA_ARCHIVE} -C /kafka --strip-components=1
export PATH="/kafka/bin/:$PATH"

Expand Down
2 changes: 2 additions & 0 deletions test/sql/kafka_test.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
\i test/sql/setup.inc

SET max_parallel_workers_per_gather = 0;

/*
* Returns EXPLAIN ANALYZE result without any arbitrary numbers like costs
* or execution time.
Expand Down
2 changes: 2 additions & 0 deletions test/sql/parallel.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ ALTER FOREIGN TABLE kafka_test_part OPTIONS(ADD num_partitions '4');
set max_parallel_workers_per_gather=2;
set max_parallel_workers=8;
set parallel_setup_cost=0;
set parallel_tuple_cost=0;
set min_parallel_table_scan_size=0;

ANALYZE kafka_test_part;
EXPLAIN (COSTS OFF) SELECT * FROM kafka_test_part ;
Expand Down

0 comments on commit 2615504

Please sign in to comment.