Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

May I ask if KEDA can take the initiative to consume the event itself? #5985

Closed
y805939188 opened this issue Jul 25, 2024 · 7 comments
Closed
Labels
feature-request All issues for new features that have not been committed to needs-discussion

Comments

@y805939188
Copy link

y805939188 commented Jul 25, 2024

Proposal

May I ask if KEDA can take the initiative to consume the event itself? For example, when a producer inserts a message into MQ or Redis, does a component of KEDA automatically pop an event? In my scenario, I don't want to do roll polling with pollingInterval, and I just want something to trigger scaling up, without a real consumer(or KEDA itself). May I ask if there is a way?
thanks~

Use-Case

No response

Is this a feature you are interested in implementing yourself?

Maybe

Anything else?

No response

@y805939188 y805939188 added feature-request All issues for new features that have not been committed to needs-discussion labels Jul 25, 2024
@JorTurFer
Copy link
Member

Hello,
Do you mean if KEDA can work as consumer to detect the queue length instead of polling every X seconds? Which is the problem with the polling?

I think that I'm missing something 😞

@y805939188
Copy link
Author

y805939188 commented Aug 5, 2024

Hello, Do you mean if KEDA can work as consumer to detect the queue length instead of polling every X seconds? Which is the problem with the polling?

I think that I'm missing something 😞

Hello @JorTurFer , Thank your reply.

my project requires the ability to scale up and down elastically.

I have researched keda and knative, both of which can meet my demands, but the cost of using knative is bigger then keda. I particularly like keda because it is very simple and easy to use.

However, in my scenario, when the Pod is scaled down to 0, I hope to wake it up simply by making an HTTP GET request. Since the http-addon in the community is currently only in beta version, so I exposed an API in my backend service to the frontend, and this API sends a message to the redis list. Then my keda polls the redis list every 5 seconds.

This can wake up the Pod, but I don't have a real consumer to consume the redis list. Now I can only set a timer in the backend to actively clean up the redis list every short period of time.

Although this way meets my demand, but it increases a lot of development costs. I have to develop a program to consume message queue(my scenario is redis list). So I would like to ask if it is possible to provide the feature to actively consume the queue with keda.

Actually, I have tried to use the external mode to directly consume the Redis list.

type MyRedisScaler struct {
	pb.ExternalScalerServer
	redis *redis.Client
}

func NewScaler() *MyRedisScaler {
	return &MyRedisScaler{
		redis:                redisClient.Client(),
		ExternalScalerServer: pb.UnimplementedExternalScalerServer{},
	}
}

func (scaler *MyRedisScaler) IsActive(ctx context.Context, scaledObject *pb.ScaledObjectRef) (*pb.IsActiveResponse, error) {
	fmt.Println("1111111111111111111")
	listName, ok := scaledObject.ScalerMetadata["listName"]
	if !ok {
		return nil, fmt.Errorf("listName not found in metadata")
	}

	elements, err := scaler.redis.LRange(ctx, listName, 0, -1).Result()
	if err != nil {
		return nil, err
	}

	_, err = scaler.redis.Del(ctx, listName).Result()
	if err != nil {
		return nil, err
	}

	fmt.Println("1111111111111111111 list length: ", len(elements))

	return &pb.IsActiveResponse{
		Result: len(elements) > 0,
	}, nil
}

func (scaler *MyRedisScaler) StreamIsActive(_ *pb.ScaledObjectRef, sias pb.ExternalScaler_StreamIsActiveServer) error {
	return errors.New("external push is not supported")
}

func (scaler *MyRedisScaler) GetMetricSpec(ctx context.Context, scaledObject *pb.ScaledObjectRef) (*pb.GetMetricSpecResponse, error) {
	fmt.Println("2222222222222222222")
	listLength, ok := scaledObject.ScalerMetadata["listLength"]
	fmt.Println("2222222222222222222 listLength: ", listLength)
	if !ok {
		return nil, fmt.Errorf("listLength not found in metadata")
	}

	listLengthInt64, err := strconv.ParseInt(listLength, 10, 64)
	if err != nil {
		return nil, err
	}
	return &pb.GetMetricSpecResponse{
		MetricSpecs: []*pb.MetricSpec{
			{
				MetricName: "listLength",
				TargetSize: listLengthInt64,
			},
		},
	}, nil
}

func (scaler *MyRedisScaler) GetMetrics(ctx context.Context, metricRequest *pb.GetMetricsRequest) (*pb.GetMetricsResponse, error) {
	fmt.Println("3333333333333333333")
	listName, ok := metricRequest.ScaledObjectRef.ScalerMetadata["listName"]
	if !ok {
		return nil, fmt.Errorf("listName not found in metadata")
	}

	length, err := scaler.redis.LLen(ctx, listName).Result()
	if err != nil {
		fmt.Println("error: ", err)
		return nil, err
	}

	fmt.Println("3333333333333333333 length: ", length)

	return &pb.GetMetricsResponse{
		MetricValues: []*pb.MetricValue{
			{
				MetricName:  "listLength",
				MetricValue: length,
			},
		},
	}, nil
}

I placed the consumption logic in IsActive function:

	_, err = scaler.redis.Del(ctx, listName).Result()

However, I found that although the Pod can be ran normally after being woken up, the messages in the list are consumed immediately in IsActive, which leads to the HPA target field being always 0/1(avg) when KEDA updates the HPA resources:
img_v3_02df_4e1b2345-6a89-4894-8df4-a427feb54d6g

As a result, only theminReplicaCount number of replicas can be created and the maxReplicaCount number of replicas can never be created. So, can KEDA add a feature that allows KEDA to actively consume messages of a specified length in the queue when started one Pod?

@JorTurFer
Copy link
Member

Hello

So I would like to ask if it is possible to provide the feature to actively consume the queue with keda.

Sadly, this is not possible and we don't have plans to support it as KEDA doesn't modify in any manner the messages. I can suggest different options to achieve your goals:

  • Giving a try to the HTTP add-on: The community engagement is growing and the traction is increasing, so it's a good option to take into account, even more in this case where you don't expect huge loads (because you don't need a proxy in the middle handling thousands of requests)
  • Use an external scaler or REST api (gRPC and REST are supported by external scaler and metrics API scaler) but receiving the request directly there and not using a Redis in the middle. You could store last GET request received and based on the timestamp of it, response active or not to KEDA

However, I found that although the Pod can be ran normally after being woken up, the messages in the list are consumed immediately in IsActive, which leads to the HPA target field being always 0/1(avg) when KEDA updates the HPA resources:

I think that this doesn't matter if you want to scale 0->1->0 as the HPA works only during 1->N->1. The scaling from/to 0 is managed by KEDA operator and the cooldown period since the metric returns 0 and KEDA scales to 0 is customizable using the ScaledObject directly

@y805939188
Copy link
Author

Hello

So I would like to ask if it is possible to provide the feature to actively consume the queue with keda.

Sadly, this is not possible and we don't have plans to support it as KEDA doesn't modify in any manner the messages. I can suggest different options to achieve your goals:

  • Giving a try to the HTTP add-on: The community engagement is growing and the traction is increasing, so it's a good option to take into account, even more in this case where you don't expect huge loads (because you don't need a proxy in the middle handling thousands of requests)
  • Use an external scaler or REST api (gRPC and REST are supported by external scaler and metrics API scaler) but receiving the request directly there and not using a Redis in the middle. You could store last GET request received and based on the timestamp of it, response active or not to KEDA

However, I found that although the Pod can be ran normally after being woken up, the messages in the list are consumed immediately in IsActive, which leads to the HPA target field being always 0/1(avg) when KEDA updates the HPA resources:

I think that this doesn't matter if you want to scale 0->1->0 as the HPA works only during 1->N->1. The scaling from/to 0 is managed by KEDA operator and the cooldown period since the metric returns 0 and KEDA scales to 0 is customizable using the ScaledObject directly

@JorTurFer Thank you for your advice.

I'd love to use the HTTP add-on. it really fits my scenario. In fact, if it were my own project, I would choose the HTTP add-on directly. However, I am developing a company project, so there is a little worry that my boss might blame me for using a beta version. Haha. May I ask if there are any plans to release the production version of the HTTP add-on?

And then, I want to ask a question about updating the HPA with KEDA. I have provided a video to show my case. If you have some free time, could you please take a look at this short video? I would be extremely grateful.

20240806-113258.mp4

In this video, I started a gRPC server in my local for running a external mode KEDA ScaledObject:

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: scaledobject-nettools-11
  namespace: ding-test
spec:
  scaleTargetRef:
    name: nettools
  pollingInterval: 10
  cooldownPeriod: 20
  idleReplicaCount: 0
  # initialCooldownPeriod: 10
  minReplicaCount: 1
  maxReplicaCount: 3
  triggers:
    - type: external
      metadata:
        scalerAddress: 172.28.130.168:13190
        listName: nettools-test:keda:queue
        listLength: "1"
        databaseIndex: "6" # optional

I developed a simple redis scaler:

package scaler

import (
	"context"
	"errors"
	"fmt"
	"strconv"

	pb "git.dp.tech/lebesgue/launching/externalscaler"
	redisClient "git.dp.tech/lebesgue/launching/pkg"
	"github.com/go-redis/redis/v8"
)

type MyRedisScaler struct {
	pb.ExternalScalerServer
	redis *redis.Client
}

func NewScaler() *MyRedisScaler {
	return &MyRedisScaler{
		redis:                redisClient.Client(),
		ExternalScalerServer: pb.UnimplementedExternalScalerServer{},
	}
}

func (scaler *MyRedisScaler) IsActive(ctx context.Context, scaledObject *pb.ScaledObjectRef) (*pb.IsActiveResponse, error) {
	fmt.Println("1111111111111111111")
	listName, ok := scaledObject.ScalerMetadata["listName"]
	if !ok {
		return nil, fmt.Errorf("listName not found in metadata")
	}

	elements, err := scaler.redis.LRange(ctx, listName, 0, -1).Result()
	if err != nil {
		return nil, err
	}

	_, err = scaler.redis.Del(ctx, listName).Result()
	if err != nil {
		return nil, err
	}

	fmt.Println("1111111111111111111 list length: ", len(elements))

	return &pb.IsActiveResponse{
		Result: len(elements) > 0,
	}, nil
}

func (scaler *MyRedisScaler) StreamIsActive(_ *pb.ScaledObjectRef, sias pb.ExternalScaler_StreamIsActiveServer) error {
	return errors.New("external push is not supported")
}

func (scaler *MyRedisScaler) GetMetricSpec(ctx context.Context, scaledObject *pb.ScaledObjectRef) (*pb.GetMetricSpecResponse, error) {
	fmt.Println("2222222222222222222")
	listLength, ok := scaledObject.ScalerMetadata["listLength"]
	fmt.Println("2222222222222222222 listLength: ", listLength)
	if !ok {
		return nil, fmt.Errorf("listLength not found in metadata")
	}

	listLengthInt64, err := strconv.ParseInt(listLength, 10, 64)
	if err != nil {
		return nil, err
	}
	return &pb.GetMetricSpecResponse{
		MetricSpecs: []*pb.MetricSpec{
			{
				MetricName: "listLength",
				TargetSize: listLengthInt64,
			},
		},
	}, nil
}

func (scaler *MyRedisScaler) GetMetrics(ctx context.Context, metricRequest *pb.GetMetricsRequest) (*pb.GetMetricsResponse, error) {
	fmt.Println("3333333333333333333")
	listName, ok := metricRequest.ScaledObjectRef.ScalerMetadata["listName"]
	if !ok {
		return nil, fmt.Errorf("listName not found in metadata")
	}

	length, err := scaler.redis.LLen(ctx, listName).Result()
	if err != nil {
		fmt.Println("error: ", err)
		return nil, err
	}

	fmt.Println("3333333333333333333 length: ", length)

	return &pb.GetMetricsResponse{
		MetricValues: []*pb.MetricValue{
			{
				MetricName:  "listLength",
				MetricValue: length,
			},
		},
	}, nil
}

And I have a deployment which is controlled by this KEDA ScaledObject. At first, my redis-list didn't have any messages, so the Pod was scaled down to 0.:
image

Then I pushed 5 message into redis-list:
image

Thereafter, my external scaler was triggered by the order GetMetrics -> IsActive. So I read redis in GetMetrics and got 5 messages and returned by MetricValues:
img_v3_02dg_5a294f36-c047-460e-8e56-6f6b581b441g

Afterwards, my IsActive was invoked, I got 5 messages from redis-list. Then I deleted the redis-list by list name:
img_v3_02dg_6aad72f0-4c7b-49d4-acf6-e61b87738cfg

Subsequently, My Pod was woke up:
image

But in my ScaledObject config, I set the minReplicaCount was 1, maxReplicaCount was 3. And I pushed 5 messages into redis-list, furthermore when KEDA operator trigger my external scaler's GetMetrics method, I return the MetricValue was 5.

I have told KEDA my MetricName was listLength and TargetSize was 1 in GetMetricSpec method:
img_v3_02dg_69d3b4dc-3bc7-445c-902c-a2505466925g

And after I sent 5 messages into redis-list, the GetMetrics method returned metric listLength was 5.

So according to my understanding, I thought KEDA would update HPA's target to 5. And Pods should be scale up to 3. But I found the HPA targets was 0/1(avg):
img_v3_02dg_40f49ec2-3da5-4363-9b8d-76ac59a572eg

I don't know if it's my misunderstanding of KEDA? I originally thought that as soon as the current metrics value was returned in GetMetrics, HPA would be updated to my current metric value. But it's different from what I thought. So I want to ask if the current behavior meets expectations? When will KEDA trigger the update of HPA?

Thank you very much~ 🙏🙏🙏

@JorTurFer
Copy link
Member

Hello,

May I ask if there are any plans to release the production version of the HTTP add-on?

The OSS add-on GA roadmap is here -> kedacore/http-add-on#911
I know that there are vendors like Kedify which offers enterprise support and GA HTTP Add-on, but I don't have any other details, just what I read from their blog.

About your scaler, you're deleting the items from Redis during is active, so GetMetrics probably is returning 0. IsActive will be used to scale from 0 to 1 the workload but then, you need to expose the current value via GetMetrics to scale from 1 to N.
Due to this is why I suggested using a timestamp to change the value

Use an external scaler or REST api (gRPC and REST are supported by external scaler and metrics API scaler) but receiving the request directly there and not using a Redis in the middle. You could store last GET request received and based on the timestamp of it, response active or not to KEDA

@y805939188
Copy link
Author

Hello,

May I ask if there are any plans to release the production version of the HTTP add-on?

The OSS add-on GA roadmap is here -> kedacore/http-add-on#911 I know that there are vendors like Kedify which offers enterprise support and GA HTTP Add-on, but I don't have any other details, just what I read from their blog.

About your scaler, you're deleting the items from Redis during is active, so GetMetrics probably is returning 0. IsActive will be used to scale from 0 to 1 the workload but then, you need to expose the current value via GetMetrics to scale from 1 to N. Due to this is why I suggested using a timestamp to change the value

Use an external scaler or REST api (gRPC and REST are supported by external scaler and metrics API scaler) but receiving the request directly there and not using a Redis in the middle. You could store last GET request received and based on the timestamp of it, response active or not to KEDA

I have a rough idea of what I should do. Thank you for your patient explanation. 👍

@JorTurFer
Copy link
Member

You're welcome! just let me know if you have any doubt about it 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature-request All issues for new features that have not been committed to needs-discussion
Projects
None yet
Development

No branches or pull requests

2 participants