Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to use subscription identifier - Duplicate messages with overlapping topics #596

Open
2 tasks done
TDYJeffreyDevloo opened this issue Sep 26, 2023 · 2 comments
Open
2 tasks done
Labels

Comments

@TDYJeffreyDevloo
Copy link

TDYJeffreyDevloo commented Sep 26, 2023

Checklist

  • I've searched the project's issues.
  • I've searched the project's discussions.
    Could not look into the discussions as the link does not seem to work.

❓ Question

MQTT5Client fires the callback multiple times if overlapping subscriptions are used. Mosquitto has an option to disable this (see https://mosquitto.org/man/mosquitto-conf-5.html -- allow_duplicate_messages) but MQTT5 describes the subscription_identifier field that is sent with every message so the client can couple the right callback to the right message.

The hiveMQ client does not expose this as an option directly but it is setting the value automatically if the broker supports it.
A small demo app to demonstrate the issue. I am using Mosquitto 2.0.18 with allow_duplicate_messages=true.

import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import com.hivemq.client.mqtt.mqtt5.reactor.Mqtt5ReactorClient;
import reactor.core.publisher.Mono;

public class Test {
   public static void main(String[] args) throws InterruptedException {
       Mqtt5ReactorClient serverClient = Mqtt5ReactorClient.from(Mqtt5Client.builder()
               .identifier("myServer")
               .serverHost("localhost")
               .serverPort(1883)
               .build());

       Mqtt5ReactorClient deviceClient = Mqtt5ReactorClient.from(Mqtt5Client.builder()
               .identifier("myDevice")
               .serverHost("localhost")
               .serverPort(1883)
               .build());

       // Subscribe to everything wildcard
       Mono<Void> subToAll = serverClient.subscribePublishesWith()
               .topicFilter("#")
               .qos(MqttQos.AT_LEAST_ONCE)
               .applySubscribe()
               .doOnSingle(subAck -> System.out.println("Successfully subscribed to #"))
               .doOnNext(publish -> System.out.println("# - Received message"))
               .then();

       Mono<Void> subToOne = serverClient.subscribePublishesWith()
               .topicFilter("A")
               .qos(MqttQos.AT_LEAST_ONCE)
               .applySubscribe()
               .doOnSingle(subAck -> System.out.println("Successfully subscribed to A"))
               .doOnNext(publish -> System.out.println("A - Received message"))
               .then();

       // Start server
       serverClient.connect()
               .then(Mono.when(subToAll, subToOne))
               .subscribe();

       Thread.sleep(100);
       // Start client
       Mqtt5Publish a = Mqtt5Publish.builder()
               .topic("A")
               .payload("1".getBytes())
               .build();
       Mono<Mqtt5PublishResult> sendRequest = deviceClient.publish(Mono.just(a))
               .doOnNext(publishResult -> System.out.println("Successfully sent to A"))
               .next();
       deviceClient.connect()
               .then(sendRequest)
               .subscribe();

   }
}

Outputs:

Successfully subscribed to #
Successfully subscribed to A
Successfully sent to A
A - Received message
# - Received message
A - Received message
# - Received message

Is it possible to use this subscription identifier in our subscriptions to avoid duplicate messages on the client side? Or are there other mechanisms that we can use on the client side of things to avoid this problem? Updating the QoS does not have any effect.

The mosquitto output is

1695716431: Sending CONNACK to myDevice (0, 0)
1695716431: Received PUBLISH from myDevice (d0, q0, r0, m0, 'A', ... (1 bytes))
1695716431: Sending PUBLISH to myServer (d0, q0, r0, m0, 'A', ... (1 bytes))
1695716431: Sending PUBLISH to myServer (d0, q0, r0, m0, 'A', ... (1 bytes))

📎 Additional context

There is support for the subscription identifier. I've delved into the code and seen that it's automatically assigned to every subscription, starting from int 1.
The decoder also looks for the identifier but overlapping subscriptions don't see to be using them to invoke the right callback.

@TDYJeffreyDevloo TDYJeffreyDevloo changed the title How to use subscription identifier - Duplicate messages with openlapping topics How to use subscription identifier - Duplicate messages with overlapping topics Sep 26, 2023
@TDYJeffreyDevloo
Copy link
Author

Digging even deeper into the code I've found that it's because of the MqttIncomingPublishService#onPublish invoking MqttIncomingPublishFlows#findMatching only reading from global flows. The subscribedFlows contains the right subscription identifier which can be matched onto the identifier of the MqttStatefulPublish but the callbacks chosen are selected from the MqttIncomingPublishFlows#globalFlows. The add functionality there should be taking the subscription identifer into account.

The Paho example that does it correctly is:

import com.hivemq.client.mqtt.datatypes.MqttQos;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;

public class TestPaho
{

  public static void main(String[] args) throws InterruptedException, MqttException
  {
    MqttAsyncClient serverClient = new MqttAsyncClient("tcp://localhost:1883", "myServer");
    MqttAsyncClient deviceClient = new MqttAsyncClient("tcp://localhost:1883", "myDevice");

    serverClient.connect().waitForCompletion();
    deviceClient.connect().waitForCompletion();

    System.out.println("Connected");

    CountDownLatch receivedLatch = new CountDownLatch(2);

    MqttProperties subProperties1 = new MqttProperties();
    subProperties1.setSubscriptionIdentifiers(List.of(0)); // Bug in paho? This is for publish but subscribe requires it
    subProperties1.setSubscriptionIdentifier(1);
    serverClient.subscribe(new MqttSubscription("#", MqttQos.AT_LEAST_ONCE.getCode()), null, null, (topic, message) ->
    {
      System.out.println("# - Received message");
      receivedLatch.countDown();
    }, subProperties1);

    MqttProperties subProperties2 = new MqttProperties();
    subProperties2.setSubscriptionIdentifiers(List.of(0));
    subProperties2.setSubscriptionIdentifier(2);
    serverClient.subscribe(new MqttSubscription("A", MqttQos.AT_LEAST_ONCE.getCode()), null, null, (topic, message) ->
    {
      System.out.println("A - Received message");
      receivedLatch.countDown();
    }, subProperties2);

    System.out.println("A - Sending");
    deviceClient.publish("A", new MqttMessage("1".getBytes(), MqttQos.AT_LEAST_ONCE.getCode(), false, new MqttProperties()))
            .waitForCompletion();

    System.out.println("A - Sent");

    receivedLatch.await();
    deviceClient.close();
    serverClient.close();
  }
}

@pglombardo pglombardo added bug and removed question labels Mar 6, 2024
@pglombardo
Copy link
Contributor

Thanks for the thorough investigation @TDYJeffreyDevloo. I'm not sure yet if this is by design currently but if so, then this should be a feature request to configure the behavior to use strict topic alias matching. @SgtSilvio should have better insight into this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants