Skip to content

Commit

Permalink
Merge branch 'dotnet:master' into fix-at-least-once-delivery
Browse files Browse the repository at this point in the history
  • Loading branch information
marve committed May 8, 2024
2 parents 8427636 + 4481869 commit a8d9fbd
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 44 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/ReleaseNotes.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
* [Core] Optimized packet serialization of PUBACK and PUBREC packets for protocol version 5.0.0 (#1939, thanks to @Y-Sindo).
* [Core] The package inspector is now fully async (#1941).
* [Client] Added a dedicated exception when the client is not connected (#1954, thanks to @marcpiulachs).
* [Client] The client will now throw a _MqttClientUnexpectedDisconnectReceivedException_ when publishing a QoS 0 message which leads to a server disconnect (BREAKING CHANGE!, #1974, thanks to @fazho).
* [Client] Exposed the certificate selection event handler in client options (#1984).
* [Server] The server will no longer send _NoMatchingSubscribers_ when the actual subscription was non success (#1965, BREAKING CHANGE!).
* [Server] The server will no longer send _NoMatchingSubscribers_ when the actual subscription was non success (#1965, BREAKING CHANGE!).
* [Server] Fixed broken support for _null_ in _AddServer_ method in ASP.NET integration (#1981).
* [ManagedClient] Added a new event (SubscriptionsChangedAsync) which is fired when a subscription or unsubscription was made (#1894, thanks to @pdufrene).
* [ManagedClient] Fixed race condition when server shuts down while subscribing (#1987, thanks to @marve).
* [TopicTemplate] Added new extension which provides a template engine for topics (#1932, thanks to @simonthum).
70 changes: 42 additions & 28 deletions Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ namespace MQTTnet.Extensions.ManagedClient
{
public sealed class ManagedMqttClient : Disposable, IManagedMqttClient
{
readonly MqttNetSourceLogger _logger;

readonly AsyncEvent<InterceptingPublishMessageEventArgs> _interceptingPublishMessageEvent = new AsyncEvent<InterceptingPublishMessageEventArgs>();
readonly AsyncEvent<ApplicationMessageProcessedEventArgs> _applicationMessageProcessedEvent = new AsyncEvent<ApplicationMessageProcessedEventArgs>();
readonly AsyncEvent<ApplicationMessageSkippedEventArgs> _applicationMessageSkippedEvent = new AsyncEvent<ApplicationMessageSkippedEventArgs>();
Expand All @@ -27,9 +29,7 @@ public sealed class ManagedMqttClient : Disposable, IManagedMqttClient
readonly AsyncEvent<ManagedProcessFailedEventArgs> _synchronizingSubscriptionsFailedEvent = new AsyncEvent<ManagedProcessFailedEventArgs>();
readonly AsyncEvent<SubscriptionsChangedEventArgs> _subscriptionsChangedEvent = new AsyncEvent<SubscriptionsChangedEventArgs>();

readonly MqttNetSourceLogger _logger;
readonly BlockingQueue<ManagedMqttApplicationMessage> _messageQueue = new BlockingQueue<ManagedMqttApplicationMessage>();

readonly AsyncLock _messageQueueLock = new AsyncLock();

/// <summary>
Expand Down Expand Up @@ -83,21 +83,19 @@ public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger)
add => _applicationMessageSkippedEvent.AddHandler(value);
remove => _applicationMessageSkippedEvent.RemoveHandler(value);
}

public event Func<ApplicationMessageProcessedEventArgs, Task> ApplicationMessageProcessedAsync
{
add => _applicationMessageProcessedEvent.AddHandler(value);
remove => _applicationMessageProcessedEvent.RemoveHandler(value);
}


public event Func<InterceptingPublishMessageEventArgs, Task> InterceptPublishMessageAsync
{
add => _interceptingPublishMessageEvent.AddHandler(value);
remove => _interceptingPublishMessageEvent.RemoveHandler(value);
}


public event Func<MqttApplicationMessageReceivedEventArgs, Task> ApplicationMessageReceivedAsync
{
add => InternalClient.ApplicationMessageReceivedAsync += value;
Expand Down Expand Up @@ -149,7 +147,7 @@ public ManagedMqttClient(IMqttClient mqttClient, IMqttNetLogger logger)
public ManagedMqttClientOptions Options { get; private set; }

public int PendingApplicationMessagesCount => _messageQueue.Count;

public async Task EnqueueAsync(MqttApplicationMessage applicationMessage)
{
ThrowIfDisposed();
Expand Down Expand Up @@ -277,7 +275,7 @@ public async Task StopAsync(bool cleanDisconnect = true)
ThrowIfDisposed();

_isCleanDisconnect = cleanDisconnect;

StopPublishing();
StopMaintainingConnection();

Expand Down Expand Up @@ -369,6 +367,13 @@ static TimeSpan GetRemainingTime(DateTime endTime)
return remainingTime < TimeSpan.Zero ? TimeSpan.Zero : remainingTime;
}

CancellationTokenSource NewTimeoutToken(CancellationToken linkedToken)
{
var newTimeoutToken = CancellationTokenSource.CreateLinkedTokenSource(linkedToken);
newTimeoutToken.CancelAfter(Options.ClientOptions.Timeout);
return newTimeoutToken;
}

async Task HandleSubscriptionExceptionAsync(Exception exception, List<MqttTopicFilter> addedSubscriptions, List<string> removedSubscriptions)
{
_logger.Warning(exception, "Synchronizing subscriptions failed.");
Expand Down Expand Up @@ -411,7 +416,7 @@ async Task MaintainConnectionAsync(CancellationToken cancellationToken)
{
if (_isCleanDisconnect)
{
using (var disconnectTimeout = new CancellationTokenSource(Options.ClientOptions.Timeout))
using (var disconnectTimeout = NewTimeoutToken(CancellationToken.None))
{
await InternalClient.DisconnectAsync(new MqttClientDisconnectOptions(), disconnectTimeout.Token).ConfigureAwait(false);
}
Expand Down Expand Up @@ -461,7 +466,7 @@ async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)

cancellationToken.ThrowIfCancellationRequested();

await TryPublishQueuedMessageAsync(message).ConfigureAwait(false);
await TryPublishQueuedMessageAsync(message, cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
Expand All @@ -477,7 +482,7 @@ async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken)
}
}

async Task PublishReconnectSubscriptionsAsync()
async Task PublishReconnectSubscriptionsAsync(CancellationToken cancellationToken)
{
_logger.Info("Publishing subscriptions at reconnect");

Expand All @@ -489,20 +494,20 @@ async Task PublishReconnectSubscriptionsAsync()
{
topicFilters = new List<MqttTopicFilter>();
SendSubscribeUnsubscribeResult subscribeUnsubscribeResult;

foreach (var sub in _reconnectSubscriptions)
{
topicFilters.Add(sub.Value);

if (topicFilters.Count == Options.MaxTopicFiltersInSubscribeUnsubscribePackets)
{
subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(topicFilters, null).ConfigureAwait(false);
subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(topicFilters, null, cancellationToken).ConfigureAwait(false);
topicFilters.Clear();
await HandleSubscriptionsResultAsync(subscribeUnsubscribeResult).ConfigureAwait(false);
}
}

subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(topicFilters, null).ConfigureAwait(false);
subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(topicFilters, null, cancellationToken).ConfigureAwait(false);
await HandleSubscriptionsResultAsync(subscribeUnsubscribeResult).ConfigureAwait(false);
}
}
Expand Down Expand Up @@ -555,13 +560,13 @@ async Task PublishSubscriptionsAsync(TimeSpan timeout, CancellationToken cancell

if (addedTopicFilters.Count == Options.MaxTopicFiltersInSubscribeUnsubscribePackets)
{
subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(addedTopicFilters, null).ConfigureAwait(false);
subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(addedTopicFilters, null, cancellationToken).ConfigureAwait(false);
addedTopicFilters.Clear();
await HandleSubscriptionsResultAsync(subscribeUnsubscribeResult).ConfigureAwait(false);
}
}
subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(addedTopicFilters, null).ConfigureAwait(false);

subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(addedTopicFilters, null, cancellationToken).ConfigureAwait(false);
await HandleSubscriptionsResultAsync(subscribeUnsubscribeResult).ConfigureAwait(false);

var removedTopicFilters = new List<string>();
Expand All @@ -571,13 +576,13 @@ async Task PublishSubscriptionsAsync(TimeSpan timeout, CancellationToken cancell

if (removedTopicFilters.Count == Options.MaxTopicFiltersInSubscribeUnsubscribePackets)
{
subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(null, removedTopicFilters).ConfigureAwait(false);
subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(null, removedTopicFilters, cancellationToken).ConfigureAwait(false);
removedTopicFilters.Clear();
await HandleSubscriptionsResultAsync(subscribeUnsubscribeResult).ConfigureAwait(false);
}
}

subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(null, removedTopicFilters).ConfigureAwait(false);
subscribeUnsubscribeResult = await SendSubscribeUnsubscribe(null, removedTopicFilters, cancellationToken).ConfigureAwait(false);
await HandleSubscriptionsResultAsync(subscribeUnsubscribeResult).ConfigureAwait(false);
}
}
Expand All @@ -592,7 +597,7 @@ async Task<ReconnectionResult> ReconnectIfRequiredAsync(CancellationToken cancel
MqttClientConnectResult connectResult = null;
try
{
using (var connectTimeout = new CancellationTokenSource(Options.ClientOptions.Timeout))
using (var connectTimeout = NewTimeoutToken(cancellationToken))
{
connectResult = await InternalClient.ConnectAsync(Options.ClientOptions, connectTimeout.Token).ConfigureAwait(false);
}
Expand All @@ -611,7 +616,7 @@ async Task<ReconnectionResult> ReconnectIfRequiredAsync(CancellationToken cancel
}
}

async Task<SendSubscribeUnsubscribeResult> SendSubscribeUnsubscribe(List<MqttTopicFilter> addedSubscriptions, List<string> removedSubscriptions)
async Task<SendSubscribeUnsubscribeResult> SendSubscribeUnsubscribe(List<MqttTopicFilter> addedSubscriptions, List<string> removedSubscriptions, CancellationToken cancellationToken)
{
var subscribeResults = new List<MqttClientSubscribeResult>();
var unsubscribeResults = new List<MqttClientUnsubscribeResult>();
Expand All @@ -626,8 +631,11 @@ async Task<SendSubscribeUnsubscribeResult> SendSubscribeUnsubscribe(List<MqttTop
unsubscribeOptionsBuilder.WithTopicFilter(removedSubscription);
}

var unsubscribeResult = await InternalClient.UnsubscribeAsync(unsubscribeOptionsBuilder.Build()).ConfigureAwait(false);
unsubscribeResults.Add(unsubscribeResult);
using (var unsubscribeTimeout = NewTimeoutToken(cancellationToken))
{
var unsubscribeResult = await InternalClient.UnsubscribeAsync(unsubscribeOptionsBuilder.Build(), unsubscribeTimeout.Token).ConfigureAwait(false);
unsubscribeResults.Add(unsubscribeResult);
}

//clear because these worked, maybe the subscribe below will fail, only report those
removedSubscriptions.Clear();
Expand All @@ -642,8 +650,11 @@ async Task<SendSubscribeUnsubscribeResult> SendSubscribeUnsubscribe(List<MqttTop
subscribeOptionsBuilder.WithTopicFilter(addedSubscription);
}

var subscribeResult = await InternalClient.SubscribeAsync(subscribeOptionsBuilder.Build()).ConfigureAwait(false);
subscribeResults.Add(subscribeResult);
using (var subscribeTimeout = NewTimeoutToken(cancellationToken))
{
var subscribeResult = await InternalClient.SubscribeAsync(subscribeOptionsBuilder.Build(), subscribeTimeout.Token).ConfigureAwait(false);
subscribeResults.Add(subscribeResult);
}
}
}
catch (Exception exception)
Expand Down Expand Up @@ -697,15 +708,15 @@ async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
{
var oldConnectionState = InternalClient.IsConnected;
var connectionState = await ReconnectIfRequiredAsync(cancellationToken).ConfigureAwait(false);

if (connectionState == ReconnectionResult.NotConnected)
{
StopPublishing();
await Task.Delay(Options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
}
else if (connectionState == ReconnectionResult.Reconnected)
{
await PublishReconnectSubscriptionsAsync().ConfigureAwait(false);
await PublishReconnectSubscriptionsAsync(cancellationToken).ConfigureAwait(false);
StartPublishing();
}
else if (connectionState == ReconnectionResult.Recovered)
Expand Down Expand Up @@ -735,7 +746,7 @@ async Task TryMaintainConnectionAsync(CancellationToken cancellationToken)
}
}

async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage message)
async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage message, CancellationToken cancellationToken)
{
Exception transmitException = null;
bool acceptPublish = true;
Expand All @@ -750,7 +761,10 @@ async Task TryPublishQueuedMessageAsync(ManagedMqttApplicationMessage message)

if (acceptPublish)
{
await InternalClient.PublishAsync(message.ApplicationMessage).ConfigureAwait(false);
using (var publishTimeout = NewTimeoutToken(cancellationToken))
{
await InternalClient.PublishAsync(message.ApplicationMessage, publishTimeout.Token).ConfigureAwait(false);
}
}

using (await _messageQueueLock.EnterAsync().ConfigureAwait(false)) //lock to avoid conflict with this.PublishAsync
Expand Down

0 comments on commit a8d9fbd

Please sign in to comment.