Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 27 additions & 30 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "OpSendMsg.h"
#include "ProducerImpl.h"
#include "PulsarApi.pb.h"
#include "ResultUtils.h"
#include "Url.h"
#include "auth/InitialAuthData.h"
#include "checksum/ChecksumProvider.h"
Expand Down Expand Up @@ -205,7 +206,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
ctx.load_verify_file(trustCertFilePath);
} else {
LOG_ERROR(trustCertFilePath << ": No such trustCertFile");
close();
close(ResultAuthenticationError, false);
return;
}
} else {
Expand All @@ -215,7 +216,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:

if (!authentication_) {
LOG_ERROR("Invalid authentication plugin");
close();
close(ResultAuthenticationError, false);
return;
}

Expand All @@ -229,12 +230,12 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
tlsPrivateKey = authData->getTlsPrivateKey();
if (!file_exists(tlsCertificates)) {
LOG_ERROR(tlsCertificates << ": No such tlsCertificates");
close();
close(ResultAuthenticationError, false);
return;
}
if (!file_exists(tlsCertificates)) {
LOG_ERROR(tlsCertificates << ": No such tlsCertificates");
close();
close(ResultAuthenticationError, false);
return;
}
ctx.use_private_key_file(tlsPrivateKey, boost::asio::ssl::context::pem);
Expand Down Expand Up @@ -660,7 +661,7 @@ void ClientConnection::handleRead(const boost::system::error_code& err, size_t b
} else {
LOG_ERROR(cnxString_ << "Read operation failed: " << err.message());
}
close();
close(ResultDisconnected);
} else if (bytesTransferred < minReadSize) {
// Read the remaining part, use a slice of buffer to write on the next
// region
Expand Down Expand Up @@ -718,7 +719,7 @@ void ClientConnection::processIncomingBuffer() {
proto::BaseCommand incomingCmd;
if (!incomingCmd.ParseFromArray(incomingBuffer_.data(), cmdSize)) {
LOG_ERROR(cnxString_ << "Error parsing protocol buffer command");
close();
close(ResultDisconnected);
return;
}

Expand All @@ -742,7 +743,7 @@ void ClientConnection::processIncomingBuffer() {
<< incomingCmd.message().message_id().ledgerid() << ", entry id "
<< incomingCmd.message().message_id().entryid()
<< "] Error parsing broker entry metadata");
close();
close(ResultDisconnected);
return;
}
incomingBuffer_.setReaderIndex(readerIndex + 2 + 4 + brokerEntryMetadataSize);
Expand All @@ -760,7 +761,7 @@ void ClientConnection::processIncomingBuffer() {
<< incomingCmd.message().message_id().ledgerid() //
<< ", entry id " << incomingCmd.message().message_id().entryid()
<< "] Error parsing message metadata");
close();
close(ResultDisconnected);
return;
}

Expand Down Expand Up @@ -991,7 +992,7 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {

default:
LOG_WARN(cnxString_ << "Received invalid message from server");
close();
close(ResultDisconnected);
break;
}
}
Expand Down Expand Up @@ -1133,7 +1134,7 @@ void ClientConnection::sendMessage(const std::shared_ptr<SendArguments>& args) {
void ClientConnection::handleSend(const boost::system::error_code& err, const SharedBuffer&) {
if (err) {
LOG_WARN(cnxString_ << "Could not send message on connection: " << err << " " << err.message());
close();
close(ResultDisconnected);
} else {
sendPendingCommands();
}
Expand All @@ -1142,7 +1143,7 @@ void ClientConnection::handleSend(const boost::system::error_code& err, const Sh
void ClientConnection::handleSendPair(const boost::system::error_code& err) {
if (err) {
LOG_WARN(cnxString_ << "Could not send pair message on connection: " << err << " " << err.message());
close();
close(ResultDisconnected);
} else {
sendPendingCommands();
}
Expand Down Expand Up @@ -1247,7 +1248,7 @@ void ClientConnection::handleKeepAliveTimeout() {

if (havePendingPingRequest_) {
LOG_WARN(cnxString_ << "Forcing connection to close after keep-alive timeout");
close();
close(ResultDisconnected);
} else {
// Send keep alive probe to peer
LOG_DEBUG(cnxString_ << "Sending ping message");
Expand Down Expand Up @@ -1287,7 +1288,14 @@ void ClientConnection::close(Result result, bool detach) {
}
state_ = Disconnected;

closeSocket();
if (socket_) {
boost::system::error_code err;
socket_->shutdown(boost::asio::socket_base::shutdown_both, err);
socket_->close(err);
if (err) {
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
}
}
if (tlsSocket_) {
boost::system::error_code err;
tlsSocket_->lowest_layer().close(err);
Expand Down Expand Up @@ -1326,7 +1334,7 @@ void ClientConnection::close(Result result, bool detach) {
}

lock.unlock();
if (result != ResultDisconnected && result != ResultRetryable) {
if (!isResultRetryable(result)) {
LOG_ERROR(cnxString_ << "Connection closed with " << result);
} else {
LOG_INFO(cnxString_ << "Connection disconnected");
Expand Down Expand Up @@ -1473,26 +1481,15 @@ Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& top
return promise.getFuture();
}

void ClientConnection::closeSocket() {
boost::system::error_code err;
if (socket_) {
socket_->shutdown(boost::asio::socket_base::shutdown_both, err);
socket_->close(err);
if (err) {
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
}
}
}

void ClientConnection::checkServerError(ServerError error) {
switch (error) {
case proto::ServerError::ServiceNotReady:
closeSocket();
close(ResultDisconnected);
break;
case proto::ServerError::TooManyRequests:
// TODO: Implement maxNumberOfRejectedRequestPerConnection like
// https://github.com/apache/pulsar/pull/274
closeSocket();
close(ResultDisconnected);
break;
default:
break;
Expand All @@ -1518,7 +1515,7 @@ void ClientConnection::handleSendReceipt(const proto::CommandSendReceipt& sendRe
if (!producer->ackReceived(sequenceId, messageId)) {
// If the producer fails to process the ack, we need to close the connection
// to give it a chance to recover from there
close();
close(ResultDisconnected);
}
}
} else {
Expand All @@ -1542,12 +1539,12 @@ void ClientConnection::handleSendError(const proto::CommandSendError& error) {
if (!producer->removeCorruptMessage(sequenceId)) {
// If the producer fails to remove corrupt msg, we need to close the
// connection to give it a chance to recover from there
close();
close(ResultDisconnected);
}
}
}
} else {
close();
close(ResultDisconnected);
}
}

Expand Down
11 changes: 10 additions & 1 deletion lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,16 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
*/
void tcpConnectAsync();

/**
* Close the connection.
*
* @param result all pending futures will complete with this result
* @param detach remove it from the pool if it's true
*
* `detach` should only be false when:
* 1. Before the connection is put into the pool, i.e. during the construction.
* 2. When the connection pool is closed
*/
void close(Result result = ResultConnectError, bool detach = true);

bool isClosed() const;
Expand Down Expand Up @@ -392,7 +402,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
ConnectionPool& pool_;
friend class PulsarFriend;

void closeSocket();
void checkServerError(ServerError error);

void handleSendReceipt(const proto::CommandSendReceipt&);
Expand Down
3 changes: 2 additions & 1 deletion lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "MessagesImpl.h"
#include "ProducerConfigurationImpl.h"
#include "PulsarApi.pb.h"
#include "ResultUtils.h"
#include "TimeUtils.h"
#include "TopicName.h"
#include "UnAckedMessageTrackerDisabled.h"
Expand Down Expand Up @@ -319,7 +320,7 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
} else {
// Consumer was not yet created, retry to connect to broker if it's possible
result = convertToTimeoutIfNecessary(result, creationTimestamp_);
if (result == ResultRetryable) {
if (isResultRetryable(result)) {
LOG_WARN(getName() << "Temporary error in creating consumer: " << strResult(result));
scheduleReconnection();
} else {
Expand Down
5 changes: 3 additions & 2 deletions lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "ClientImpl.h"
#include "ExecutorService.h"
#include "LogUtils.h"
#include "ResultUtils.h"
#include "TimeUtils.h"

DECLARE_LOG_OBJECT()
Expand Down Expand Up @@ -117,7 +118,7 @@ void HandlerBase::handleDisconnection(Result result, const ClientConnectionPtr&

resetCnx();

if (result == ResultRetryable) {
if (isResultRetryable(result)) {
scheduleReconnection();
return;
}
Expand Down Expand Up @@ -169,7 +170,7 @@ void HandlerBase::handleTimeout(const boost::system::error_code& ec) {
}

Result HandlerBase::convertToTimeoutIfNecessary(Result result, ptime startTimestamp) const {
if (result == ResultRetryable && (TimeUtils::now() - startTimestamp >= operationTimeut_)) {
if (isResultRetryable(result) && (TimeUtils::now() - startTimestamp >= operationTimeut_)) {
return ResultTimeout;
} else {
return result;
Expand Down
3 changes: 2 additions & 1 deletion lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "OpSendMsg.h"
#include "ProducerConfigurationImpl.h"
#include "PulsarApi.pb.h"
#include "ResultUtils.h"
#include "Semaphore.h"
#include "TimeUtils.h"
#include "TopicName.h"
Expand Down Expand Up @@ -272,7 +273,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
} else {
// Producer was not yet created, retry to connect to broker if it's possible
result = convertToTimeoutIfNecessary(result, creationTimestamp_);
if (result == ResultRetryable) {
if (isResultRetryable(result)) {
LOG_WARN(getName() << "Temporary error in creating producer: " << strResult(result));
scheduleReconnection();
} else {
Expand Down
29 changes: 29 additions & 0 deletions lib/ResultUtils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once

#include <pulsar/Result.h>

namespace pulsar {

inline bool isResultRetryable(Result result) {
return result == ResultRetryable || result == ResultDisconnected;
}

} // namespace pulsar
3 changes: 2 additions & 1 deletion lib/RetryableOperation.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "ExecutorService.h"
#include "Future.h"
#include "LogUtils.h"
#include "ResultUtils.h"

namespace pulsar {

Expand Down Expand Up @@ -95,7 +96,7 @@ class RetryableOperation : public std::enable_shared_from_this<RetryableOperatio
promise_.setValue(value);
return;
}
if (result != ResultRetryable) {
if (!isResultRetryable(result)) {
promise_.setFailed(result);
return;
}
Expand Down