Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

CompletableFuture returned from subscribe on a CONNECTING async client never completes if the client does not connect #612

Open
jfontsaballs opened this issue Jan 11, 2024 · 0 comments
Labels

Comments

@jfontsaballs
Copy link

馃悰 Bug Report

CompletableFuture returned from subscribe on a CONNECTING async client never completes if the client does not connect. Also, after this the client can not be stopped and reconnection seems to stop happening.

馃敩 How To Reproduce

Please see the code sample below, executed without a broker. For this behavior to occur the client never connects to the broker.

A similarly suspicious behavior happens if you call subscribe while the client is reconnecting, although I have not been able to analyze it properly.

Code sample

Kotiln:

import com.hivemq.client.mqtt.lifecycle.MqttClientAutoReconnect
import java.util.UUID
import java.util.concurrent.TimeUnit

fun main() {
    var shouldStop = false
    val client = com.hivemq.client.mqtt.MqttClient.builder()
        .identifier(UUID.randomUUID().toString())
        .serverHost("localhost")
        .automaticReconnect(
            MqttClientAutoReconnect.builder()
                .initialDelay(500, TimeUnit.MILLISECONDS)
                .maxDelay(5000, TimeUnit.MILLISECONDS)
                .build()
        )
        .addDisconnectedListener { context ->
            println("Disconnected")
            if (shouldStop)
                context.reconnector.reconnect(false)
        }
        .useMqttVersion5()
        .buildAsync()

    // Start connection but don't wait for it to complete
    val connectionFuture = client.connectWith()
        .cleanStart(true)
        .keepAlive(10 /*seconds*/)
        .send()

    println(client.state) // CONNECTING

    // Publish, it throws as expected
    try {
        client.publishWith()
            .topic("something")
            .payload("whatever".toByteArray())
            .send().join()
    } catch (e: Throwable) {
        println(e) //com.hivemq.client.mqtt.exceptions.MqttClientStateException: MQTT client is not connected.
    }

    val subscribeFuture = client.subscribeWith()
        .topicFilter("something")
        .send()
    Thread.sleep(5000)
    //subscribeFuture.join() !! Never completes
    println(subscribeFuture)

    // Then I tried setting a timeout
    try {
        client.subscribeWith()
            .topicFilter("something")
            .send()
            .orTimeout(2, TimeUnit.SECONDS)
            .join()
    } catch (e: Throwable) {
        println(e)
    }

    shouldStop = true
    connectionFuture.cancel(true)
    try {
        client.disconnect().join()
    } catch (e: Throwable) {
        println(e)
    }
    println("END")
    // After this, application never stops neither it tries to reconnect with the broker
}

Output

CONNECTING
java.util.concurrent.CompletionException: com.hivemq.client.mqtt.exceptions.MqttClientStateException: MQTT client is not connected.
Disconnected
Disconnected
Disconnected
com.hivemq.client.internal.rx.RxFutureConverter$RxSingleFuture@238d68ff[Not completed]
Disconnected
java.util.concurrent.CompletionException: java.util.concurrent.TimeoutException
java.util.concurrent.CompletionException: com.hivemq.client.mqtt.exceptions.MqttClientStateException: MQTT client is not connected.
END

Environment

Where are you running/using this client?

Hardware or Device?
Laptop with Intel i7 Processor

What version of this client are you using?
1.3.0

JVM version?
Java 21, Gradle 8.5

Operating System?
Windows 10

Which MQTT protocol version is being used?
5

Which MQTT broker (name and version)?
None

馃搱 Expected behavior

Subscribe throws if it can't get through, similar to publish

馃搸 Additional context

Bug found while analyzing the behavior of the client under communication difficulties with the broker.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant