Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using Fetch() with and OrderedConsumer and a delivery policy of DeliverLastPerSubjectPolicy returns incorrect results #1662

Open
cchamplin opened this issue Jun 21, 2024 · 1 comment
Assignees
Labels
defect Suspected defect such as a bug or regression

Comments

@cchamplin
Copy link

Observed behavior

When using Fetch() with an OrderedConsumer and a delivery policy of DeliverLastPerSubjectPolicy multiple messages for a single subject will be returned. Instead of one message per subject. The last message for each subject is not returned.

Expected behavior

The last message per subject is returned in the results. Each subject is unique.

Server and client version

Server: 2.10.16
Client 1.36.0

Host environment

N/A

Steps to reproduce

package main

import (
	"context"
	"fmt"
	"os"
	"path/filepath"
	"time"

	server "github.com/nats-io/nats-server/v2/server"
	"github.com/nats-io/nats.go"
	"github.com/nats-io/nats.go/jetstream"
)

func main() {
	var bus, shutdown = StartLocalNATSServer()
	defer shutdown()

	var busAddress = "nats://" + bus.Addr().String() // use server address

	busConnection, err := nats.Connect(busAddress)
	if err != nil {
		panic(err)
	}

	NATSWaitConnected(busConnection) // wait connection if not connected yet

	cfg := jetstream.StreamConfig{
		Name:      "test-stream",
		Subjects:  []string{"test.>"},
		Storage:   jetstream.MemoryStorage,
		Retention: jetstream.LimitsPolicy,
		Replicas:  1,
	}

	js, err := jetstream.New(busConnection)
	if err != nil {
		panic(err)
	}

	stream, err := js.CreateStream(context.Background(), cfg)
	if err != nil {
		panic(err)
	}

	fmt.Println("Stream created")
	_ = stream

	_, err = js.Publish(context.Background(), "test.foo.1", []byte("Hello World!"))
	if err != nil {
		panic(err)
	}

	_, err = js.Publish(context.Background(), "test.foo.1", []byte("Hello World! 2"))
	if err != nil {
		panic(err)
	}

	_, err = js.Publish(context.Background(), "test.foo.1", []byte("Hello World! 3"))
	if err != nil {
		panic(err)
	}

	_, err = js.Publish(context.Background(), "test.foo.2", []byte("Hello World!"))
	if err != nil {
		panic(err)
	}

	_, err = js.Publish(context.Background(), "test.foo.2", []byte("Hello World! 2"))
	if err != nil {
		panic(err)
	}

	consumer, err := js.OrderedConsumer(context.Background(), "test-stream", jetstream.OrderedConsumerConfig{
		FilterSubjects: []string{
			"test.foo.>",
		},
		DeliverPolicy:     jetstream.DeliverLastPerSubjectPolicy,
		ReplayPolicy:      jetstream.ReplayInstantPolicy,
		InactiveThreshold: time.Second * 5,
	})
	if err != nil {
		panic(err)
	}

	consumerInfo := consumer.CachedInfo()
	fmt.Println("Consumer created, pending:", consumerInfo.NumPending)

	msgs, err := consumer.Fetch(int(consumerInfo.NumPending))
	if err != nil {
		panic(err)
	}

	for msg := range msgs.Messages() {
		fmt.Println(msg.Subject())
	}

	// Output:
	// Stream created
	// Consumer created, pending: 2
	// test.foo.1
	// test.foo.1

	// Expected output:
	// Stream created
	// Consumer created, pending: 2
	// test.foo.1
	// test.foo.2
}

func StartLocalNATSServer() (s *server.Server, shutdown func()) {
	s, err := server.NewServer(&server.Options{
		Host:           "127.0.0.1",
		Port:           server.RANDOM_PORT,
		NoLog:          true,
		NoSigs:         true,
		MaxControlLine: 2048,
		JetStream:      true,
		StoreDir:       filepath.Join(os.TempDir(), server.JetStreamStoreDir, "tmp"),
	})
	if err != nil {
		panic(err)
	}

	go func() {
		err := server.Run(s)
		if err != nil {
			panic(err)
		}
	}()

	if s == nil {
		panic("starting nats server: nil")
	}

	if !s.ReadyForConnections(5 * time.Second) {
		panic("starting nats server: timeout")
	}
	return s, func() {
		s.Shutdown()
	}
}

func NATSWaitConnected(c *nats.Conn) {
	timeout := time.Now().Add(5 * time.Second)
	for time.Now().Before(timeout) {
		if c.IsConnected() {
			return
		}
		time.Sleep(25 * time.Millisecond)
	}
	panic("nats connection timeout")
}
@cchamplin cchamplin added the defect Suspected defect such as a bug or regression label Jun 21, 2024
@Jarema
Copy link
Member

Jarema commented Jun 24, 2024

I was able to reproduce this bug.
It seems that for some reason the ordered consumer with this config is instantly recreated, and recreated consumer always resumes as last known sequence.

We will issue a fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

3 participants