Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot subscribed to different topics withj the same client #100

Closed
gadieichhorn opened this issue Jun 7, 2018 · 5 comments
Closed

Cannot subscribed to different topics withj the same client #100

gadieichhorn opened this issue Jun 7, 2018 · 5 comments

Comments

@gadieichhorn
Copy link

I think this is a missing option with the spec. I would like to register different handlers to different topics with the same client so I can handle different topics in specific way.

here is a simple test that breaks becuase the first handler will take all messages

    @Test
    public void canSubscribeToDIfferentTopicsTest(TestContext context) {
        Async async = context.async(2);

        client.publishHandler(s -> {
            String name = s.payload().toJsonObject().getString("name");
            log.info("NAME: {}", name);
            context.assertEquals("a", name);
            async.countDown();
        })
            .subscribe("test/a", MqttQoS.AT_LEAST_ONCE.value())
            .publish("test/a", new JsonObject().put("name", "a").toBuffer(), MqttQoS.AT_LEAST_ONCE, false, false);

        client.publishHandler(s -> {
            String name = s.payload().toJsonObject().getString("name");
            log.info("NAME: {}", name);
            context.assertEquals("b", name);
            async.countDown();
        })
            .subscribe("test/b", MqttQoS.AT_LEAST_ONCE.value())
            .publish("test/b", new JsonObject().put("name", "b").toBuffer(), MqttQoS.AT_LEAST_ONCE, false, false);
    }

Do I have to create seperate clients for each publish handler?

@Sammers21
Copy link
Contributor

@gadieichhorn, yes, it is designed in a way to handle all the incoming publishes by the one handler, so it is not a bug.

There is some issue related to the described feature functionality: #55.

@Sammers21
Copy link
Contributor

If you would like to, you can prototype the API you would like to see in the client and submit a PR. We will see and discuss.

@gadieichhorn
Copy link
Author

ok, this make sense, basically it is ok to use different clients for differnet topics in the same verticle?

I also looked at my old Paho code, it works the same way.

here is how I changed the tests to work

@Slf4j
@RunWith(VertxUnitRunner.class)
public class BrokerTest {

    private Vertx vertx;

    @Before
    public void before(TestContext context) {
        vertx = Vertx.vertx();
        InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE);
    }

    @After
    public void after() {
    }

    private void create(Vertx vertx, TestContext context, Consumer<MqttClient> response) {
        MqttClient client = MqttClient.create(vertx, new MqttClientOptions()
            .setClientId(UUID.randomUUID().toString())
            .setUsername("test")
            .setPassword("test"));

        client.connect(
            IntegrationSuite.getMosquitto().getMappedPort(1883).intValue(),
            IntegrationSuite.getMosquitto().getContainerIpAddress(),
            connect -> {
                if (connect.succeeded()) {
                    response.accept(client);
                } else {
                    log.error("Connection failed", connect.cause());
                    context.fail(connect.cause());
                }
            });
    }

    @Test(timeout = 2000)
    public void canSubscribeToDIfferentTopicsTest(TestContext context) {
        Async async = context.async(2);

        create(vertx, context, client -> {
            client.publishHandler(s -> {
                String name = s.payload().toJsonObject().getString("name");
                log.info("NAME: {}", name);
                context.assertEquals("a", name);
                async.countDown();
            })
                .subscribe("test/a", MqttQoS.AT_LEAST_ONCE.value())
                .publish("test/a", new JsonObject().put("name", "a").toBuffer(), MqttQoS.AT_LEAST_ONCE, false, false);

        });

        create(vertx, context, client -> {
            client.publishHandler(s -> {
                String name = s.payload().toJsonObject().getString("name");
                log.info("NAME: {}", name);
                context.assertEquals("b", name);
                async.countDown();
            })
                .subscribe("test/b", MqttQoS.AT_LEAST_ONCE.value())
                .publish("test/b", new JsonObject().put("name", "b").toBuffer(), MqttQoS.AT_LEAST_ONCE, false, false);
        });

    }
}

@Sammers21
Copy link
Contributor

@gadieichhorn, Yes, I think it's fine, but I would not recommend it. I would figure out what is the path of the message in publishHandler and routed it to a related handler, so it is super easy.

@ppatierno
Copy link
Member

Why having a different handler for each subscribed topic ? or why having a different client for each topic to subscribe ? The publishHandler parameter provides you the topicName method in order to get the information about on which subscribed topic the message was received. I think that it's enough.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

3 participants