Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for RedPanda? #61

Open
slushpuppy opened this issue Dec 9, 2021 · 0 comments
Open

Support for RedPanda? #61

slushpuppy opened this issue Dec 9, 2021 · 0 comments

Comments

@slushpuppy
Copy link

  • What problem did you encounter?

PHP Fatal error: Uncaught longlang\phpkafka\Exception\KafkaErrorException: [35] The version of API is not supported. in /src/vendor/longlang/phpkafka/src/Protocol/ErrorCode.php:385
Stack trace:
#0 /src/vendor/longlang/phpkafka/src/Util/KafkaUtil.php(69): longlang\phpkafka\Protocol\ErrorCode::check()
#1 /src/vendor/longlang/phpkafka/src/Group/GroupManager.php(111): longlang\phpkafka\Util\KafkaUtil::retry()
#2 /src/vendor/longlang/phpkafka/src/Consumer/Consumer.php(162): longlang\phpkafka\Group\GroupManager->syncGroup()
#3 /src/vendor/longlang/phpkafka/src/Consumer/Consumer.php(128): longlang\phpkafka\Consumer\Consumer->rejoin()
#4 /src/Lib/AbstractServiceEventWorker.php(53): longlang\phpkafka\Consumer\Consumer->__construct()
#5 /src/App/cli/serviceEventWorker.php(19): Lib\AbstractServiceEventWorker->start()

  • Is the Kafka environment self-built or cloud service?

apt installation

  • Please execute the following command to get environment information.

php -v & php --ri swoole & composer info | grep longlang/phpkafka
php8.0 -v & php8.0 --ri openswoole
[1] 533
PHP 8.0.13 (cli) (built: Nov 22 2021 09:50:24) ( NTS )
Copyright (c) The PHP Group
Zend Engine v4.0.13, Copyright (c) Zend Technologies
with Zend OPcache v8.0.13, Copyright (c), by Zend Technologies
with Xdebug v3.0.4, Copyright (c) 2002-2021, by Derick Rethans

openswoole

Open Swoole => enabled
Author => Open Swoole Group & Contributors [email protected]
Version => 4.8.1
Built => Dec 6 2021 17:59:05
coroutine => enabled with boost asm context
epoll => enabled
eventfd => enabled
signalfd => enabled
cpu_affinity => enabled
spinlock => enabled
rwlock => enabled
sockets => enabled
openssl => OpenSSL 1.1.1 11 Sep 2018
dtls => enabled
http2 => enabled
json => enabled
curl-native => enabled
pcre => enabled
zlib => 1.2.11
mutex_timedlock => enabled
pthread_barrier => enabled
futex => enabled
mysqlnd => enabled
async_redis => enabled

Directive => Local Value => Master Value
swoole.enable_coroutine => On => On
swoole.enable_library => On => On
swoole.enable_preemptive_scheduler => Off => Off
swoole.display_errors => On => On
swoole.use_shortname => On => On
swoole.unixsock_buffer_size => 8388608 => 8388608

"longlang/phpkafka": "^1.2",

# Paste here

  • Provide the smallest reproducible code:
// Your code
         $config = new ConsumerConfig();
      $addr = Config::getBroker();
      $config->setBroker($addr->toIPPortFormat());
      $config->setTopic($this->getTopic()); // topic
      $config->setClientId($this->sanitizeClassName(static::class)); // client ID. Use
      $config->setInterval(0.1);
      $config->setSsl(Config::getSSLConfig());
      $config->setGroupId($this->sanitizeClassName($this->getGroupId()));
      $config->setGroupInstanceId($this->sanitizeClassName(static::class));
      $this->config = $config;


......

        $this->consumer = new Consumer($this->config, [$this,'consume']);

        $this->consumer->start();

I am able to connect and consume topics using other php kafka libraries such as simple-kafka-client

 $builder = KafkaConsumerBuilder::create();

        $consumer = $builder->withAdditionalConfig(
            [
                // start at the very beginning of the topic when reading for the first time
                'auto.offset.reset' => 'earliest',

                // will be visible in broker logs
                'client.id' => 'php-kafka-lib-high-level-consumer',

                // SSL settings
                'security.protocol' => 'ssl',
                'ssl.ca.location' => Config::CA_CERT,
                'ssl.certificate.location' => Config::CLIENT_CERT,
                'ssl.key.location' => Config::CLIENT_KEY,

                // SASL settings
                //'sasl.mechanisms' => '',
                //'ssl.endpoint.identification.algorithm' => 'https',
                //'sasl.username' => '',
                //'sasl.password' => '',

                // Add additional output if you need to debug a problem
                // 'log_level' => (string) LOG_DEBUG,
                // 'debug' => 'all'
            ]
        )
            ->withAdditionalBroker(Config::getBroker()->toIPPortFormat())
            ->withConsumerGroup('php-kafka-lib-high-level-consumer')
            ->withSubscription('php-kafka-lib-test-topic')
            ->build();

        $consumer->subscribe();

        while (true) {
            try {
                $message = $consumer->consume(10000);
            } catch (KafkaConsumerTimeoutException|KafkaConsumerEndOfPartitionException $e) {
                echo 'Didn\'t receive any messages, waiting for more...' . PHP_EOL;
                continue;
            } catch (KafkaConsumerConsumeException $e) {
                echo $e->getMessage() . PHP_EOL;
                continue;
            }

            echo sprintf(
                    'Read message with key:%s payload:%s topic:%s partition:%d offset:%d headers:%s',
                    $message->getKey(),
                    $message->getBody(),
                    $message->getTopicName(),
                    $message->getPartition(),
                    $message->getOffset(),
                    implode(',', $message->getHeaders())
                ) . PHP_EOL;

            $consumer->commit($message);
        }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant