Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message Distribution Issues with Kafka brokers: Impact on Pod Scaling #8257

Open
naveengogu opened this issue Oct 16, 2024 · 3 comments
Open
Labels
triage/needs-user-input Issues which are waiting on a response from the reporter

Comments

@naveengogu
Copy link

naveengogu commented Oct 16, 2024

Expected Behavior

If I send 9 messages to the Kafka brokers, they should be distributed across 3 partitions in kafka, and 3 pods should scale since I set the concurrency to 1. The pod will just sleep in 10 minutes and print the message

Actual Behavior

When I send a message to the Kafka broker URL, it is being added to a single partition in the Kafka topic. I have 3 partitions, but because of this, my pods are not scaling to more than one, based on the messages.

@pierDipi
Copy link
Member

@naveengogu can you show an example event you're sending to Kafka brokers? also what version are you using?

@pierDipi pierDipi added the triage/needs-user-input Issues which are waiting on a response from the reporter label Oct 24, 2024
@naveengogu
Copy link
Author

@pierDipi I am using version 1.15.2, and this is the sample curl command.

curl -X POST  http://kafka-broker-ingress.knative-eventing.svc.cluster.local/raga/test-sleep -H "content-type: application/json" -H "ce-specversion: 1.0" -H "ce-source: my/curl/command" -H "ce-type: my.demo.event"  -H "ce-id: 0815" \
  -d '{"message": "hello1"}'

I have deleted the broker, service, and trigger, then redeployed them. Now the messages are being distributed across multiple partitions, but only one pod is consuming all the messages. The pods are not scaling based on events. My containerConcurrency is set to 1.

What could be causing the pods not to scale? When I hit the Knative service directly, it scales, but it doesn't scale when I send messages through the brokers.

And I found this error in kafka-controller, is it causing that issue?

{"level":"error","ts":"2024-10-24T08:04:09.576Z","logger":"kafka-broker-controller","caller":"statefulset/autoscaler.go:168","msg":"Failed to autoscale","commit":"a7715ef-dirty","knative.dev/pod":"kafka-controller-6468764d4c-vbq8l","error":"statefulsets.apps \"kafka-channel-dispatcher\" not found","stacktrace":"knative.dev/eventing/pkg/scheduler/statefulset.(*autoscaler).syncAutoscale.func1\n\tknative.dev/[email protected]/pkg/scheduler/statefulset/autoscaler.go:168\nk8s.io/apimachinery/pkg/util/wait.Poll.ConditionFunc.WithContext.func1\n\tk8s.io/[email protected]/pkg/util/wait/wait.go:109\nk8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtectionWithContext\n\tk8s.io/[email protected]/pkg/util/wait/wait.go:154\nk8s.io/apimachinery/pkg/util/wait.waitForWithContext\n\tk8s.io/[email protected]/pkg/util/wait/wait.go:207\nk8s.io/apimachinery/pkg/util/wait.poll\n\tk8s.io/[email protected]/pkg/util/wait/poll.go:260\nk8s.io/apimachinery/pkg/util/wait.PollWithContext\n\tk8s.io/[email protected]/pkg/util/wait/poll.go:85\nk8s.io/apimachinery/pkg/util/wait.Poll\n\tk8s.io/[email protected]/pkg/util/wait/poll.go:66\nknative.dev/eventing/pkg/scheduler/statefulset.(*autoscaler).syncAutoscale\n\tknative.dev/[email protected]/pkg/scheduler/statefulset/autoscaler.go:165\nknative.dev/eventing/pkg/scheduler/statefulset.(*autoscaler).Start\n\tknative.dev/[email protected]/pkg/scheduler/statefulset/autoscaler.go:145\nknative.dev/eventing/pkg/scheduler/statefulset.New.func2\n\tknative.dev/[email protected]/pkg/scheduler/statefulset/scheduler.go:106"}

@naveengogu
Copy link
Author

@pierDipi I am using version 1.15.2, and this is the sample curl command.

curl -X POST  http://kafka-broker-ingress.knative-eventing.svc.cluster.local/raga/test-sleep -H "content-type: application/json" -H "ce-specversion: 1.0" -H "ce-source: my/curl/command" -H "ce-type: my.demo.event"  -H "ce-id: 0815" \
  -d '{"message": "hello1"}'

I have deleted the broker, service, and trigger, then redeployed them. Now the messages are being distributed across multiple partitions, but only one pod is consuming all the messages. The pods are not scaling based on events. My containerConcurrency is set to 1.

What could be causing the pods not to scale? When I hit the Knative service directly, it scales, but it doesn't scale when I send messages through the brokers.

And I found this error in kafka-controller, is it causing that issue?

{"level":"error","ts":"2024-10-24T08:04:09.576Z","logger":"kafka-broker-controller","caller":"statefulset/autoscaler.go:168","msg":"Failed to autoscale","commit":"a7715ef-dirty","knative.dev/pod":"kafka-controller-6468764d4c-vbq8l","error":"statefulsets.apps \"kafka-channel-dispatcher\" not found","stacktrace":"knative.dev/eventing/pkg/scheduler/statefulset.(*autoscaler).syncAutoscale.func1\n\tknative.dev/[email protected]/pkg/scheduler/statefulset/autoscaler.go:168\nk8s.io/apimachinery/pkg/util/wait.Poll.ConditionFunc.WithContext.func1\n\tk8s.io/[email protected]/pkg/util/wait/wait.go:109\nk8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtectionWithContext\n\tk8s.io/[email protected]/pkg/util/wait/wait.go:154\nk8s.io/apimachinery/pkg/util/wait.waitForWithContext\n\tk8s.io/[email protected]/pkg/util/wait/wait.go:207\nk8s.io/apimachinery/pkg/util/wait.poll\n\tk8s.io/[email protected]/pkg/util/wait/poll.go:260\nk8s.io/apimachinery/pkg/util/wait.PollWithContext\n\tk8s.io/[email protected]/pkg/util/wait/poll.go:85\nk8s.io/apimachinery/pkg/util/wait.Poll\n\tk8s.io/[email protected]/pkg/util/wait/poll.go:66\nknative.dev/eventing/pkg/scheduler/statefulset.(*autoscaler).syncAutoscale\n\tknative.dev/[email protected]/pkg/scheduler/statefulset/autoscaler.go:165\nknative.dev/eventing/pkg/scheduler/statefulset.(*autoscaler).Start\n\tknative.dev/[email protected]/pkg/scheduler/statefulset/autoscaler.go:145\nknative.dev/eventing/pkg/scheduler/statefulset.New.func2\n\tknative.dev/[email protected]/pkg/scheduler/statefulset/scheduler.go:106"}

Few more logs

"level":"info","ts":"2024-10-25T10:34:00.637Z","logger":"kafka-broker-controller","caller":"state/state.go:194","msg":"failed to get statefulset","commit":"a7715ef-dirty","knative.dev/pod":"kafka-controller-84cf6b879d-g5c79","error":"statefulsets.apps \"kafka-channel-dispatcher\" not found"}
{"level":"info","ts":"2024-10-25T10:34:00.637Z","logger":"kafka-broker-controller","caller":"statefulset/autoscaler.go:182","msg":"error while refreshing scheduler state (will retry){error 26 0  statefulsets.apps \"kafka-channel-dispatcher\" not found}","commit":"a7715ef-dirty","knative.dev/pod":"kafka-controller-84cf6b879d-g5c79","component":"autoscaler"}
{"level":"error","ts":"2024-10-25T10:34:00.637Z","logger":"kafka-broker-controller","caller":"statefulset/autoscaler.go:168","msg":"Failed to autoscale","commit":"a7715ef-dirty","knative.dev/pod":"kafka-controller-84cf6b879d-g5c79","error":"statefulsets.apps \"kafka-channel-dispatcher\" not found","stacktrace":"knative.dev/eventing/pkg/scheduler/statefulset.(*autoscaler).syncAutoscale.func1\n\tknative.dev/[email protected]/pkg/scheduler/statefulset/autoscaler.go:168\nk8s.io/apimachinery/pkg/util/wait.Poll.ConditionFunc.WithContext.func1\n\tk8s.io/[email protected]/pkg/util/wait/wait.go:109\nk8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtectionWithContext\n\tk8s.io/[email protected]/pkg/util/wait/wait.go:154\nk8s.io/apimachinery/pkg/util/wait.waitForWithContext\n\tk8s.io/[email protected]/pkg/util/wait/wait.go:207\nk8s.io/apimachinery/pkg/util/wait.poll\n\tk8s.io/[email protected]/pkg/util/wait/poll.go:260\nk8s.io/apimachinery/pkg/util/wait.PollWithContext\n\tk8s.io/[email protected]/pkg/util/wait/poll.go:85\nk8s.io/apimachinery/pkg/util/wait.Poll\n\tk8s.io/[email protected]/pkg/util/wait/poll.go:66\nknative.dev/eventing/pkg/scheduler/statefulset.(*autoscaler).syncAutoscale\n\tknative.dev/[email protected]/pkg/scheduler/statefulset/autoscaler.go:165\nknative.dev/eventing/pkg/scheduler/statefulset.(*autoscaler).Start\n\tknative.dev/[email protected]/pkg/scheduler/statefulset/autoscaler.go:145\nknative.dev/eventing/pkg/scheduler/statefulset.New.func2\n\tknative.dev/[email protected]/pkg/scheduler/statefulset/scheduler.go:106"}
{"level":"info","ts":"2024-10-25T10:34:01.136Z","logger":"kafka-broker-controller","caller":"state/state.go:194","msg":"failed to get statefulset","commit":"a7715ef-dirty","knative.dev/pod":"kafka-controller-84cf6b879d-g5c79","error":"statefulsets.apps \"kafka-channel-dispatcher\" not found"}
{"level":"info","ts":"2024-10-25T10:34:01.136Z","logger":"kafka-broker-controller","caller":"statefulset/autoscaler.go:182","msg":"error while refreshing scheduler state (will retry){error 26 0  statefulsets.apps \"kafka-channel-dispatcher\" not found}","commit":"a7715ef-dirty","knative.dev/pod":"kafka-controller-84cf6b879d-g5c79","component":"autoscaler"}
{"level":"error","ts":"2024-10-25T10:34:01.136Z","logger":"kafka-broker-controller","caller":"statefulset/autoscaler.go:168","msg":"Failed to autoscale","commit":"a7715ef-dirty","knative.dev/pod":"kafka-controller-84cf6b879d-g5c79","error":"statefulsets.apps \"kafka-channel-dispatcher\" not found","stacktrace":"knative.dev/eventing/pkg/scheduler/statefulset.(*autoscaler).syncAutoscale.func1\n\tknative.dev/[email protected]/pkg/scheduler/statefulset/autoscaler.go:168\nk8s.io/apimachinery/pkg/util/wait.Poll.ConditionFunc.WithContext.func1\n\tk8s.io/[email protected]/pkg/util/wait/wait.go:109\nk8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtectionWithContext\n\tk8s.io/[email protected]/pkg/util/wait/wait.go:154\nk8s.io/apimachinery/pkg/util/wait.waitForWithContext\n\tk8s.io/[email protected]/pkg/util/wait/wait.go:207\nk8s.io/apimachinery/pkg/util/wait.poll\n\tk8s.io/[email protected]/pkg/util/wait/poll.go:260\nk8s.io/apimachinery/pkg/util/wait.PollWithContext\n\tk8s.io/[email protected]/pkg/util/wait/poll.go:85\nk8s.io/apimachinery/pkg/util/wait.Poll\n\tk8s.io/[email protected]/pkg/util/wait/poll.go:66\nknative.dev/eventing/pkg/scheduler/statefulset.(*autoscaler).syncAutoscale\n\tknative.dev/[email protected]/pkg/scheduler/statefulset/autoscaler.go:165\nknative.dev/eventing/pkg/scheduler/statefulset.(*autoscaler).Start\n\tknative.dev/[email protected]/pkg/scheduler/statefulset/autoscaler.go:145\nknative.dev/eventing/pkg/scheduler/statefulset.New.func2\n\tknative.dev/[email protected]/pkg/scheduler/statefulset/scheduler.go:106"}
{"level":"info","ts":"2024-10-25T10:34:01.636Z","logger":"kafka-broker-controller","caller":"state/state.go:194","msg":"failed to get statefulset","commit":"a7715ef-dirty","knative.dev/pod":"kafka-controller-84cf6b879d-g5c79","error":"statefulsets.apps \"kafka-channel-dispatcher\" not found"}
{"level":"info","ts":"2024-10-25T10:34:01.636Z","logger":"kafka-broker-controller","caller":"statefulset/autoscaler.go:182","msg":"error while refreshing scheduler state (will retry){error 26 0  statefulsets.apps \"kafka-channel-dispatcher\" not found}","commit":"a7715ef-dirty","knative.dev/pod":"kafka-controller-84cf6b879d-g5c79","component":"autoscaler"}
{"level":"error","ts":"2024-10-25T10:34:01.636Z","logger":"kafka-broker-controller","caller":"statefulset/autoscaler.go:168","msg":"Failed to autoscale","commit":"a7715ef-dirty","knative.dev/pod":"kafka-controller-84cf6b879d-g5c79","error":"statefulsets.apps \"kafka-channel-dispatcher\" not found","stacktrace":"knative.dev/eventing/pkg/scheduler/statefulset.(*autoscaler).syncAutoscale.func1\n\tknative.dev/[email protected]/pkg/scheduler/statefulset/autoscaler.go:168\nk8s.io/apimachinery/pkg/util/wait.Poll.ConditionFunc.WithContext.func1\n\tk8s.io/[email protected]/pkg/util/wait/wait.go:109\nk8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtectionWithContext\n\tk8s.io/[email protected]/pkg/util/wait/wait.go:154\nk8s.io/apimachinery/pkg/util/wait.waitForWithContext\n\tk8s.io/[email protected]/pkg/util/wait/wait.go:207\nk8s.io/apimachinery/pkg/util/wait.poll\n\tk8s.io/[email protected]/pkg/util/wait/poll.go:260\nk8s.io/apimachinery/pkg/util/wait.PollWithContext\n\tk8s.io/[email protected]/pkg/util/wait/poll.go:85\nk8s.io/apimachinery/pkg/util/wait.Poll\n\tk8s.io/[email protected]/pkg/util/wait/poll.go:66\nknative.dev/eventing/pkg/scheduler/statefulset.(*autoscaler).syncAutoscale\n\tknative.dev/[email protected]/pkg/scheduler/statefulset/autoscaler.go:165\nknative.dev/eventing/pkg/scheduler/statefulset.(*autoscaler).Start\n\tknative.dev/[email protected]/pkg/scheduler/statefulset/autoscaler.go:145\nknative.dev/eventing/pkg/scheduler/statefulset.New.func2\n\tknative.dev/[email protected]/pkg/scheduler/statefulset/scheduler.go:106"}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
triage/needs-user-input Issues which are waiting on a response from the reporter
Projects
None yet
Development

No branches or pull requests

2 participants