Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions src/brpc/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ ChannelSSLOptions* ChannelOptions::mutable_ssl_options() {
static ChannelSignature ComputeChannelSignature(const ChannelOptions& opt) {
if (opt.auth == NULL &&
!opt.has_ssl_options() &&
opt.client_host.empty() &&
opt.device_name.empty() &&
opt.connection_group.empty() &&
opt.hc_option.health_check_path.empty()) {
// Returning zeroized result by default is more intuitive for users.
Expand All @@ -94,6 +96,14 @@ static ChannelSignature ComputeChannelSignature(const ChannelOptions& opt) {
buf.append("|conng=");
buf.append(opt.connection_group);
}
if (!opt.client_host.empty()) {
buf.append("|clih=");
buf.append(opt.client_host);
}
if (!opt.device_name.empty()) {
buf.append("|devn=");
buf.append(opt.device_name);
}
if (opt.auth) {
buf.append("|auth=");
buf.append((char*)&opt.auth, sizeof(opt.auth));
Expand Down Expand Up @@ -362,14 +372,27 @@ int Channel::InitSingle(const butil::EndPoint& server_addr_and_port,
LOG(ERROR) << "Invalid port=" << port;
return -1;
}
butil::EndPoint client_endpoint;
if (!_options.client_host.empty() &&
butil::str2ip(_options.client_host.c_str(), &client_endpoint.ip) != 0 &&
butil::hostname2ip(_options.client_host.c_str(), &client_endpoint.ip) != 0) {
LOG(ERROR) << "Invalid client host=`" << _options.client_host << '\'';
return -1;
}
_server_address = server_addr_and_port;
const ChannelSignature sig = ComputeChannelSignature(_options);
std::shared_ptr<SocketSSLContext> ssl_ctx;
if (CreateSocketSSLContext(_options, &ssl_ctx) != 0) {
return -1;
}
SocketOptions opt;
opt.local_side = client_endpoint;
opt.initial_ssl_ctx = ssl_ctx;
opt.use_rdma = _options.use_rdma;
opt.hc_option = _options.hc_option;
opt.device_name = _options.device_name;
if (SocketMapInsert(SocketMapKey(server_addr_and_port, sig),
&_server_id, ssl_ctx, _options.use_rdma, _options.hc_option) != 0) {
&_server_id, opt) != 0) {
LOG(ERROR) << "Fail to insert into SocketMap";
return -1;
}
Expand Down Expand Up @@ -397,6 +420,13 @@ int Channel::Init(const char* ns_url,
_options.mutable_ssl_options()->sni_name = _service_name;
}
}
butil::EndPoint client_endpoint;
if (!_options.client_host.empty() &&
butil::str2ip(_options.client_host.c_str(), &client_endpoint.ip) != 0 &&
butil::hostname2ip(_options.client_host.c_str(), &client_endpoint.ip) != 0) {
LOG(ERROR) << "Invalid client host=`" << _options.client_host << '\'';
return -1;
}
std::unique_ptr<LoadBalancerWithNaming> lb(new (std::nothrow)
LoadBalancerWithNaming);
if (NULL == lb) {
Expand All @@ -406,10 +436,13 @@ int Channel::Init(const char* ns_url,
GetNamingServiceThreadOptions ns_opt;
ns_opt.succeed_without_server = _options.succeed_without_server;
ns_opt.log_succeed_without_server = _options.log_succeed_without_server;
ns_opt.use_rdma = _options.use_rdma;
ns_opt.socket_option.use_rdma = _options.use_rdma;
ns_opt.channel_signature = ComputeChannelSignature(_options);
ns_opt.hc_option = _options.hc_option;
if (CreateSocketSSLContext(_options, &ns_opt.ssl_ctx) != 0) {
ns_opt.socket_option.hc_option = _options.hc_option;
ns_opt.socket_option.local_side = client_endpoint;
ns_opt.socket_option.device_name = _options.device_name;
if (CreateSocketSSLContext(_options,
&ns_opt.socket_option.initial_ssl_ctx) != 0) {
return -1;
}
if (lb->Init(ns_url, lb_name, _options.ns_filter, &ns_opt) != 0) {
Expand Down
10 changes: 10 additions & 0 deletions src/brpc/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@ struct ChannelOptions {
// Its priority is higher than FLAGS_health_check_path and FLAGS_health_check_timeout_ms.
// When it is not set, FLAGS_health_check_path and FLAGS_health_check_timeout_ms will take effect.
HealthCheckOption hc_option;

// IP address or host name of the client.
// if the client_host is "", the client IP address is determined by the OS.
// Default: ""
std::string client_host;

// The device name of the client's network adapter.
// if the device_name is "", the flow control is determined by the OS.
// Default: ""
std::string device_name;
private:
// SSLOptions is large and not often used, allocate it on heap to
// prevent ChannelOptions from being bloated in most cases.
Expand Down
4 changes: 2 additions & 2 deletions src/brpc/details/naming_service_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ void NamingServiceThread::Actions::ResetServers(
// Socket. SocketMapKey may be passed through AddWatcher. Make sure
// to pick those Sockets with the right settings during OnAddedServers
const SocketMapKey key(_added[i], _owner->_options.channel_signature);
CHECK_EQ(0, SocketMapInsert(key, &tagged_id.id, _owner->_options.ssl_ctx,
_owner->_options.use_rdma, _owner->_options.hc_option));
CHECK_EQ(0, SocketMapInsert(key, &tagged_id.id,
_owner->_options.socket_option));
_added_sockets.push_back(tagged_id);
}

Expand Down
9 changes: 4 additions & 5 deletions src/brpc/details/naming_service_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,14 @@ class NamingServiceWatcher {
struct GetNamingServiceThreadOptions {
GetNamingServiceThreadOptions()
: succeed_without_server(false)
, log_succeed_without_server(true)
, use_rdma(false) {}
, log_succeed_without_server(true) {
socket_option.use_rdma = false;
}

bool succeed_without_server;
bool log_succeed_without_server;
bool use_rdma;
HealthCheckOption hc_option;
ChannelSignature channel_signature;
std::shared_ptr<SocketSSLContext> ssl_ctx;
SocketOptions socket_option;
};

// A dedicated thread to map a name to ServerIds
Expand Down
25 changes: 23 additions & 2 deletions src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,8 @@ int Socket::OnCreated(const SocketOptions& options) {
_keytable_pool = options.keytable_pool;
_tos = 0;
_remote_side = options.remote_side;
_local_side = butil::EndPoint();
_local_side = options.local_side;
_device_name = options.device_name;
_on_edge_triggered_events = options.on_edge_triggered_events;
_user = options.user;
_conn = options.conn;
Expand Down Expand Up @@ -1296,7 +1297,25 @@ int Socket::Connect(const timespec* abstime,
CHECK_EQ(0, butil::make_close_on_exec(sockfd));
// We need to do async connect (to manage the timeout by ourselves).
CHECK_EQ(0, butil::make_non_blocking(sockfd));

if (!_device_name.empty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不是很确定 #3157 需要的是指定source IP,还是指定网络设备。如果只需要指定source IP, 之前的bind可以满足需求。这里要不要把之前的那个client_host选项一并加进来?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

确实可以加进来,让网络配置更精细化,我有空合并一下

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wenjiecn 大佬有空整理一下吗

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@372046933 done

if (setsockopt(sockfd, SOL_SOCKET, SO_BINDTODEVICE,
_device_name.c_str(), _device_name.size()) < 0) {
PLOG(ERROR) << "Fail to set SO_BINDTODEVICE of fd=" << sockfd
<< " to device_name=" << _device_name;
return -1;
}
}
if (local_side().ip != butil::IP_ANY) {
struct sockaddr_storage cli_addr;
if (butil::endpoint2sockaddr(local_side(), &cli_addr, &addr_size) != 0) {
PLOG(ERROR) << "Fail to get client sockaddr";
return -1;
}
if (::bind(sockfd, (struct sockaddr*)&cli_addr, addr_size) != 0) {
PLOG(ERROR) << "Fail to bind client socket, errno=" << strerror(errno);
return -1;
}
}
const int rc = ::connect(
sockfd, (struct sockaddr*)&serv_addr, addr_size);
if (rc != 0 && errno != EINPROGRESS) {
Expand Down Expand Up @@ -2811,6 +2830,7 @@ int Socket::GetPooledSocket(SocketUniquePtr* pooled_socket) {
if (socket_pool == NULL) {
SocketOptions opt;
opt.remote_side = remote_side();
opt.local_side = butil::EndPoint(local_side().ip, 0);
opt.user = user();
opt.on_edge_triggered_events = _on_edge_triggered_events;
opt.initial_ssl_ctx = _ssl_ctx;
Expand Down Expand Up @@ -2912,6 +2932,7 @@ int Socket::GetShortSocket(SocketUniquePtr* short_socket) {
SocketId id;
SocketOptions opt;
opt.remote_side = remote_side();
opt.local_side = butil::EndPoint(local_side().ip, 0);
opt.user = user();
opt.on_edge_triggered_events = _on_edge_triggered_events;
opt.initial_ssl_ctx = _ssl_ctx;
Expand Down
5 changes: 5 additions & 0 deletions src/brpc/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ struct SocketOptions {
// user->BeforeRecycle() before recycling.
int fd{-1};
butil::EndPoint remote_side;
butil::EndPoint local_side;
std::string device_name;
// If `connect_on_create' is true and `fd' is less than 0,
// a client connection will be established to remote_side()
// regarding deadline `connect_abstime' when Socket is being created.
Expand Down Expand Up @@ -830,6 +832,9 @@ friend void DereferenceSocket(Socket*);
// Address of self. Initialized in ResetFileDescriptor().
butil::EndPoint _local_side;

// The device name of the client's network adapter.
std::string _device_name;

// Called when edge-triggered events happened on `_fd'. Read comments
// of EventDispatcher::AddConsumer (event_dispatcher.h)
// carefully before implementing the callback.
Expand Down
16 changes: 4 additions & 12 deletions src/brpc/socket_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,9 @@ SocketMap* get_or_new_client_side_socket_map() {
}

int SocketMapInsert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx,
bool use_rdma,
const HealthCheckOption& hc_option) {
return get_or_new_client_side_socket_map()->Insert(key, id, ssl_ctx, use_rdma, hc_option);
}
SocketOptions& opt) {
return get_or_new_client_side_socket_map()->Insert(key, id, opt);
}

int SocketMapFind(const SocketMapKey& key, SocketId* id) {
SocketMap* m = get_client_side_socket_map();
Expand Down Expand Up @@ -227,9 +225,7 @@ void SocketMap::ShowSocketMapInBvarIfNeed() {
}

int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx,
bool use_rdma,
const HealthCheckOption& hc_option) {
SocketOptions& opt) {
ShowSocketMapInBvarIfNeed();

std::unique_lock<butil::Mutex> mu(_mutex);
Expand All @@ -249,11 +245,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
sc = NULL;
}
SocketId tmp_id;
SocketOptions opt;
opt.remote_side = key.peer.addr;
opt.initial_ssl_ctx = ssl_ctx;
opt.use_rdma = use_rdma;
opt.hc_option = hc_option;
if (_options.socket_creator->CreateSocket(opt, &tmp_id) != 0) {
PLOG(FATAL) << "Fail to create socket to " << key.peer;
return -1;
Expand Down
22 changes: 20 additions & 2 deletions src/brpc/socket_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,19 @@ struct SocketMapKeyHasher {
// successfully, SocketMapRemove() MUST be called when the Socket is not needed.
// Return 0 on success, -1 otherwise.
int SocketMapInsert(const SocketMapKey& key, SocketId* id,
SocketOptions& opt);

inline int SocketMapInsert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx,
bool use_rdma,
const HealthCheckOption& hc_option);
const HealthCheckOption& hc_option) {
SocketOptions opt;
opt.remote_side = key.peer.addr;
opt.initial_ssl_ctx = ssl_ctx;
opt.use_rdma = use_rdma;
opt.hc_option = hc_option;
return SocketMapInsert(key, id, opt);
}

inline int SocketMapInsert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx) {
Expand Down Expand Up @@ -155,7 +165,14 @@ class SocketMap {
int Insert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx,
bool use_rdma,
const HealthCheckOption& hc_option);
const HealthCheckOption& hc_option) {
SocketOptions opt;
opt.remote_side = key.peer.addr;
opt.initial_ssl_ctx = ssl_ctx;
opt.use_rdma = use_rdma;
opt.hc_option = hc_option;
return Insert(key, id, opt);
}

int Insert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx) {
Expand All @@ -167,6 +184,7 @@ class SocketMap {
HealthCheckOption hc_option;
return Insert(key, id, empty_ptr, false, hc_option);
}
int Insert(const SocketMapKey& key, SocketId* id, SocketOptions& opt);

void Remove(const SocketMapKey& key, SocketId expected_id);
int Find(const SocketMapKey& key, SocketId* id);
Expand Down
45 changes: 45 additions & 0 deletions test/brpc_server_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2070,4 +2070,49 @@ TEST_F(ServerTest, auth) {
ASSERT_EQ(0, server.Join());
}

void TestClientHost(const butil::EndPoint& ep,
brpc::Controller& cntl,
int error_code, bool failed,
brpc::ChannelOptions& copt) {
brpc::Channel chan;
copt.max_retry = 0;
ASSERT_EQ(0, chan.Init(ep, &copt));

test::EchoRequest req;
test::EchoResponse res;
req.set_message(EXP_REQUEST);
test::EchoService_Stub stub(&chan);
stub.Echo(&cntl, &req, &res, NULL);
ASSERT_EQ(cntl.Failed(), failed) << cntl.ErrorText();
ASSERT_EQ(cntl.ErrorCode(), error_code);
}

TEST_F(ServerTest, bind_client_host_and_network_device) {
butil::EndPoint ep;
ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep));
brpc::Server server;
EchoServiceImpl service;
ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
brpc::ServerOptions opt;
ASSERT_EQ(0, server.Start(ep, &opt));

brpc::Controller cntl;
brpc::ChannelOptions copt;
copt.client_host = "localhost";
copt.device_name = "lo";
std::vector<brpc::ConnectionType> connection_types = {
brpc::CONNECTION_TYPE_SINGLE,
brpc::CONNECTION_TYPE_POOLED,
brpc::CONNECTION_TYPE_SHORT
};
for (auto connect_type : connection_types) {
copt.connection_type = connect_type;
TestClientHost(ep, cntl, 0, false, copt);
cntl.Reset();
}

ASSERT_EQ(0, server.Stop(0));
ASSERT_EQ(0, server.Join());
}

} //namespace
Loading