Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,18 @@ public void addConnection(ConnectionContext context, ConnectionInfo info) throws
synchronized (clientIdSet) {
oldContext = clientIdSet.get(clientId);
if (oldContext != null) {
if (context.isAllowLinkStealing()) {
Connection oldConnection = oldContext.getConnection();
// Allow the new connection if link-stealing is enabled OR if the old
// connection is already in the process of stopping (race condition where
// the client reconnects before the broker has finished cleaning up the
// previous disconnected connection from clientIdSet).
boolean oldConnectionStopping = oldConnection instanceof TransportConnection
&& ((TransportConnection) oldConnection).isStopping();
if (context.isAllowLinkStealing() || oldConnectionStopping) {
clientIdSet.put(clientId, context);
} else {
throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
+ oldContext.getConnection().getRemoteAddress());
+ oldConnection.getRemoteAddress());
}
} else {
clientIdSet.put(clientId, context);
Expand All @@ -274,7 +281,11 @@ public void addConnection(ConnectionContext context, ConnectionInfo info) throws
if (oldContext != null) {
if (oldContext.getConnection() != null) {
Connection connection = oldContext.getConnection();
LOG.warn("Stealing link for clientId {} From Connection {}", clientId, oldContext.getConnection());
if (connection instanceof TransportConnection && ((TransportConnection) connection).isStopping()) {
LOG.debug("Reconnect for clientId {} allowed; old connection {} is already stopping", clientId, connection);
} else {
LOG.warn("Stealing link for clientId {} From Connection {}", clientId, connection);
}
if (connection instanceof TransportConnection) {
TransportConnection transportConnection = (TransportConnection) connection;
transportConnection.stopAsync(new IOException("Stealing link for clientId " + clientId + " From Connection " + oldContext.getConnection().getConnectionId()));
Expand Down