Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retained messages aren't pushed to the listener #616

Open
andsel opened this issue Feb 4, 2024 · 3 comments
Open

Retained messages aren't pushed to the listener #616

andsel opened this issue Feb 4, 2024 · 3 comments
Labels

Comments

@andsel
Copy link

andsel commented Feb 4, 2024

🐛 Bug Report

A publisher publish a retained message, after that a subscriber subscribe to the same topic and should receive the retained message despite at wire level the publish happens.

🔬 How To Reproduce

Steps to reproduce the behavior:

Run the next test code

Code sample

@Test
    public void test() throws Exception {
        // connect a publisher
        final Mqtt5BlockingClient publisher = com.hivemq.client.mqtt.MqttClient.builder()
            .useMqttVersion5()
            .identifier("publisher")
            .serverHost("localhost")
            .serverPort(1883)
            .buildBlocking();
        Mqtt5Connect connectRequest = Mqtt5Connect.builder()
            .cleanStart(true)
            .build();
        assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, publisher.connect(connectRequest).getReasonCode(), "publisher connected");

        // publish a retained message on topic metric/temperature/living
        publisher.publishWith()
            .topic("metric/temperature/living")
            .payload("18".getBytes(StandardCharsets.UTF_8))
            .retain(true)
            .qos(MqttQos.AT_LEAST_ONCE)
            .send();

        // connect a subscriber to the same topic
        final Mqtt5BlockingClient subscriber = com.hivemq.client.mqtt.MqttClient.builder()
            .useMqttVersion5()
            .identifier("subscriber_with_retain_as_published")
            .serverHost("localhost")
            .serverPort(1883)
            .buildBlocking();
        assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, subscriber.connect().getReasonCode(), "subscriber_with_retain_as_published" + " connected");
        subscriber.subscribeWith()
            .topicFilter("metric/temperature/living")
            .qos(MqttQos.AT_LEAST_ONCE)
            .send();

        // publish reaches the subscriber and also retain flag should be true
        try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
            Optional<Mqtt5Publish> publishMessage = publishes.receive(2, TimeUnit.SECONDS);
            if (!publishMessage.isPresent()) {
                fail("Expected to receive a publish message");
                return;
            }
            Mqtt5Publish msg = publishMessage.get();
            final String payload = new String(msg.getPayloadAsBytes(), StandardCharsets.UTF_8);
            assertEquals("18", payload, "Shared message must be received");
            assertEquals(MqttQos.AT_LEAST_ONCE, msg.getQos());
            assertTrue(msg.isRetain(), "Publish must be retained also if the subscription 'retain as published' is set to 1");
        }
    }

Environment

Where are you running/using this client?
On embedded test

Hardware or Device?
not important

What version of this client are you using?
1.3.3

JVM version?
Adoptium 17

Operating System?
MacOS

Which MQTT protocol version is being used?
MQTT 5

Which MQTT broker (name and version)?
Moquette 0.18-SNAPSHOT

Screenshots

📈 Expected behavior

Once a subscriber subscribe to a topic that matches a previously retained message, when the PUB message reaches the client, than that message should be pushed to the listener.

📎 Additional context

pcap that demonstrate that the publish reaches the client.
pub_retained.pcapng.gz

@andsel andsel added the bug label Feb 4, 2024
@andsel
Copy link
Author

andsel commented Feb 10, 2024

I think I've found an issue that could be related to this:

@Test
    public void test() throws Exception {
        final Mqtt5BlockingClient publisher = com.hivemq.client.mqtt.MqttClient.builder()
            .useMqttVersion5()
            .identifier("publisher")
            .serverHost("localhost")
            .serverPort(1883)
            .buildBlocking();
        Mqtt5Connect connectRequest = Mqtt5Connect.builder()
            .cleanStart(true)
            .build();
        assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, publisher.connect(connectRequest).getReasonCode(), "publisher" + " connected");
        //publish a retained message
        publisher.publishWith()
            .topic("metric/temperature/living")
            .payload("18".getBytes(StandardCharsets.UTF_8))
            .retain(true)
            .qos(MqttQos.AT_LEAST_ONCE)
            .send();

        // receive retained only if new subscription
        final Mqtt5BlockingClient subscriber = com.hivemq.client.mqtt.MqttClient.builder()
            .useMqttVersion5()
            .identifier("subscriber")
            .serverHost("localhost")
            .serverPort(1883)
            .buildBlocking();
        assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, subscriber.connect().getReasonCode(), "subscriber" + " connected");
        subscriber.subscribeWith()
            .topicFilter("metric/temperature/living")
            .qos(MqttQos.AT_LEAST_ONCE)
            .retainHandling(Mqtt5RetainHandling.SEND_IF_SUBSCRIPTION_DOES_NOT_EXIST)
            .send();

        try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
            Optional<Mqtt5Publish> publishMessage = publishes.receive(1, TimeUnit.SECONDS);
            if (!publishMessage.isPresent()) {
                fail("Expected to receive a publish message");
                return;
            }
            Mqtt5Publish pub = publishMessage.get();
            assertEquals("18", new String(pub.getPayloadAsBytes(), StandardCharsets.UTF_8));
        }
    }

@Serkan80
Copy link

I'm having the same problem with version 1.3.4.

I send 3 messages to a topic, while there are no subscribers. And when the subscriber connects, it receives only the last message that was published. The first two messages seem to lost.

And I've a similar code setup as OP, except that I also call .whenComplete(), after send().

@andsel
Copy link
Author

andsel commented Dec 13, 2024

@Serkan80 I think that the problem resides in the sequence of operation, in my test I've done:

  1. subscribe
  2. setup the listener

while the correct sequence should be:

  1. setup a listener for the subscriber
  2. subscribe

Else it could be that the publish is received before the listener is set up.
So, switch from

       subscriber.subscribeWith()
            .topicFilter("metric/temperature/living")
            .qos(MqttQos.AT_LEAST_ONCE)
            .retainHandling(Mqtt5RetainHandling.SEND_IF_SUBSCRIPTION_DOES_NOT_EXIST)
            .send();

        try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
            Optional<Mqtt5Publish> publishMessage = publishes.receive(1, TimeUnit.SECONDS);
            if (!publishMessage.isPresent()) {
                fail("Expected to receive a publish message");
                return;
            }
            Mqtt5Publish pub = publishMessage.get();
            assertEquals("18", new String(pub.getPayloadAsBytes(), StandardCharsets.UTF_8));
        }

to

        try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
           subscriber.subscribeWith()
              .topicFilter("metric/temperature/living")
              .qos(MqttQos.AT_LEAST_ONCE)
              .retainHandling(Mqtt5RetainHandling.SEND_IF_SUBSCRIPTION_DOES_NOT_EXIST)
              .send();

            Optional<Mqtt5Publish> publishMessage = publishes.receive(1, TimeUnit.SECONDS);
            if (!publishMessage.isPresent()) {
                fail("Expected to receive a publish message");
                return;
            }
            Mqtt5Publish pub = publishMessage.get();
            assertEquals("18", new String(pub.getPayloadAsBytes(), StandardCharsets.UTF_8));
        }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants