Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions google/cloud/bigtable/internal/bigtable_stub_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "google/cloud/bigtable/internal/bigtable_round_robin_decorator.h"
#include "google/cloud/bigtable/internal/bigtable_tracing_stub.h"
#include "google/cloud/bigtable/internal/connection_refresh_state.h"
#include "google/cloud/bigtable/internal/defaults.h"
#include "google/cloud/bigtable/options.h"
#include "google/cloud/common_options.h"
#include "google/cloud/grpc_options.h"
Expand All @@ -47,17 +48,29 @@ std::shared_ptr<grpc::Channel> CreateGrpcChannel(
return auth.CreateChannel(options.get<EndpointOption>(), std::move(args));
}

std::string CreateFeaturesMetadata(bool is_direct_path) {
google::bigtable::v2::FeatureFlags proto;
proto.set_reverse_scans(true);
proto.set_last_scanned_row_responses(true);
proto.set_mutate_rows_rate_limit(true);
proto.set_mutate_rows_rate_limit2(true);
proto.set_routing_cookie(true);
proto.set_retry_info(true);
if (is_direct_path) {
proto.set_traffic_director_enabled(true);
proto.set_direct_access_requested(true);
}
return internal::UrlsafeBase64Encode(proto.SerializeAsString());
}

std::string FeaturesMetadata() {
static auto const* const kFeatures = new auto([] {
google::bigtable::v2::FeatureFlags proto;
proto.set_reverse_scans(true);
proto.set_last_scanned_row_responses(true);
proto.set_mutate_rows_rate_limit(true);
proto.set_mutate_rows_rate_limit2(true);
proto.set_routing_cookie(true);
proto.set_retry_info(true);
return internal::UrlsafeBase64Encode(proto.SerializeAsString());
}());
if (bigtable::internal::IsDirectPath()) {
static auto const* const kDirectPathFeatures =
new std::string(CreateFeaturesMetadata(true));
return *kDirectPathFeatures;
}
static auto const* const kFeatures =
new std::string(CreateFeaturesMetadata(false));
return *kFeatures;
}

Expand Down
83 changes: 83 additions & 0 deletions google/cloud/bigtable/internal/bigtable_stub_factory_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
#include "google/cloud/grpc_options.h"
#include "google/cloud/internal/api_client_header.h"
#include "google/cloud/internal/async_streaming_read_rpc_impl.h"
#include "google/cloud/internal/base64_transforms.h"
#include "google/cloud/internal/make_status.h"
#include "google/cloud/testing_util/fake_completion_queue_impl.h"
#include "google/cloud/testing_util/mock_grpc_authentication_strategy.h"
#include "google/cloud/testing_util/opentelemetry_matchers.h"
#include "google/cloud/testing_util/scoped_log.h"
#include "google/cloud/testing_util/setenv.h"
#include "google/cloud/testing_util/status_matchers.h"
#include "google/cloud/testing_util/validate_metadata.h"
#include "google/cloud/testing_util/validate_propagator.h"
#include "google/bigtable/v2/feature_flags.pb.h"
#include <gmock/gmock.h>
#include <chrono>
#include <regex>
Expand Down Expand Up @@ -285,6 +288,86 @@ TEST(BigtableStubFactory, LoggingDisabled) {
EXPECT_THAT(log.ExtractLines(), Not(Contains(HasSubstr("MutateRow"))));
}

TEST(BigtableStubFactory, FeaturesFlagsCloudDirectPath) {
MockFactory factory;
EXPECT_CALL(factory, Call)
.WillOnce([](std::shared_ptr<grpc::Channel> const&) {
auto mock = std::make_shared<MockBigtableStub>();
EXPECT_CALL(*mock, MutateRow)
.WillOnce([](grpc::ClientContext& context, Options const&,
google::bigtable::v2::MutateRowRequest const&) {
ValidateMetadataFixture fixture;
auto headers = fixture.GetMetadata(context);
auto it = headers.find("bigtable-features");
EXPECT_NE(it, headers.end());
auto decoded = internal::UrlsafeBase64Decode(it->second);
EXPECT_STATUS_OK(decoded);
if (!decoded) return internal::AbortedError("fail to decode");
google::bigtable::v2::FeatureFlags proto;
EXPECT_TRUE(proto.ParseFromArray(
decoded->data(), static_cast<int>(decoded->size())));
EXPECT_TRUE(proto.traffic_director_enabled());
EXPECT_TRUE(proto.direct_access_requested());
return internal::AbortedError("fail");
});
return mock;
});

testing_util::SetEnv("GOOGLE_CLOUD_ENABLE_DIRECT_PATH", "bigtable");
auto auth = MakeStubFactoryMockAuth();
CompletionQueue cq;
auto stub = CreateDecoratedStubs(
std::move(auth), std::move(cq),
Options{}
.set<EndpointOption>("localhost:1")
.set<GrpcNumChannelsOption>(1)
.set<UnifiedCredentialsOption>(MakeInsecureCredentials()),
factory.AsStdFunction());
grpc::ClientContext context;
(void)stub->MutateRow(context, Options{}, {});
testing_util::UnsetEnv("GOOGLE_CLOUD_ENABLE_DIRECT_PATH");
}

TEST(BigtableStubFactory, FeaturesFlagsBigtableDirectPath) {
MockFactory factory;
EXPECT_CALL(factory, Call)
.WillOnce([](std::shared_ptr<grpc::Channel> const&) {
auto mock = std::make_shared<MockBigtableStub>();
EXPECT_CALL(*mock, MutateRow)
.WillOnce([](grpc::ClientContext& context, Options const&,
google::bigtable::v2::MutateRowRequest const&) {
ValidateMetadataFixture fixture;
auto headers = fixture.GetMetadata(context);
auto it = headers.find("bigtable-features");
EXPECT_NE(it, headers.end());
auto decoded = internal::UrlsafeBase64Decode(it->second);
EXPECT_STATUS_OK(decoded);
if (!decoded) return internal::AbortedError("fail to decode");
google::bigtable::v2::FeatureFlags proto;
EXPECT_TRUE(proto.ParseFromArray(
decoded->data(), static_cast<int>(decoded->size())));
EXPECT_TRUE(proto.traffic_director_enabled());
EXPECT_TRUE(proto.direct_access_requested());
return internal::AbortedError("fail");
});
return mock;
});

testing_util::SetEnv("CBT_ENABLE_DIRECTPATH", "true");
auto auth = MakeStubFactoryMockAuth();
CompletionQueue cq;
auto stub = CreateDecoratedStubs(
std::move(auth), std::move(cq),
Options{}
.set<EndpointOption>("localhost:1")
.set<GrpcNumChannelsOption>(1)
.set<UnifiedCredentialsOption>(MakeInsecureCredentials()),
factory.AsStdFunction());
grpc::ClientContext context;
(void)stub->MutateRow(context, Options{}, {});
testing_util::UnsetEnv("CBT_ENABLE_DIRECTPATH");
}

TEST(BigtableStubFactory, FeaturesFlags) {
MockFactory factory;
EXPECT_CALL(factory, Call)
Expand Down
27 changes: 16 additions & 11 deletions google/cloud/bigtable/internal/defaults.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,18 @@ int DefaultConnectionPoolSize() {
cpu_count * BIGTABLE_CLIENT_DEFAULT_CHANNELS_PER_CPU);
}

bool IsDirectPath() {
auto const direct_path =
google::cloud::internal::GetEnv("GOOGLE_CLOUD_ENABLE_DIRECT_PATH")
.value_or("");
// Bigtable specific env var for Direct Path support used by all clients.
auto const cbt_direct_path =
google::cloud::internal::GetEnv("CBT_ENABLE_DIRECTPATH").value_or("");
return absl::c_any_of(absl::StrSplit(direct_path, ','),
[](absl::string_view v) { return v == "bigtable"; }) ||
cbt_direct_path == "true";
}

Options HandleUniverseDomain(Options opts) {
if (!opts.has<::google::cloud::bigtable_internal::DataEndpointOption>()) {
auto ep = google::cloud::internal::UniverseDomainEndpoint(
Expand Down Expand Up @@ -171,18 +183,11 @@ Options DefaultOptions(Options opts) {
}
}

auto const direct_path =
GetEnv("GOOGLE_CLOUD_ENABLE_DIRECT_PATH").value_or("");
if (absl::c_any_of(absl::StrSplit(direct_path, ','),
[](absl::string_view v) { return v == "bigtable"; })) {
// Set the specific data endpoints if Direct Path is enabled.
if (IsDirectPath()) {
opts.set<::google::cloud::bigtable_internal::DataEndpointOption>(
"google-c2p:///directpath-bigtable.googleapis.com")
.set<AuthorityOption>("directpath-bigtable.googleapis.com");

// When using DirectPath the gRPC library already does load balancing across
// multiple sockets, it makes little sense to perform additional load
// balancing in the client library.
if (!opts.has<GrpcNumChannelsOption>()) opts.set<GrpcNumChannelsOption>(1);
Comment thread
elinorli marked this conversation as resolved.
"c2p:///bigtable.googleapis.com")
.set<AuthorityOption>("bigtable.googleapis.com");
}

auto emulator = GetEnv("BIGTABLE_EMULATOR_HOST");
Expand Down
5 changes: 5 additions & 0 deletions google/cloud/bigtable/internal/defaults.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ namespace internal {

int DefaultConnectionPoolSize();

/**
* Returns true if Direct Path is enabled for Bigtable.
*/
bool IsDirectPath();

/**
* Returns an `Options` with the appropriate defaults for Bigtable.
*
Expand Down
68 changes: 58 additions & 10 deletions google/cloud/bigtable/internal/defaults_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -497,15 +497,16 @@ TEST(EndpointEnvTest, UserCredentialsOverrideEmulatorEnv) {
typeid(opts.get<GrpcCredentialOption>()));
}

TEST(EndpointEnvTest, DirectPathEnabled) {
TEST(EndpointEnvTest, CloudDirectPathEnabled) {
ScopedEnvironment emulator("BIGTABLE_EMULATOR_HOST", absl::nullopt);
ScopedEnvironment direct_path("GOOGLE_CLOUD_ENABLE_DIRECT_PATH",
"storage,bigtable");
ScopedEnvironment cbt_direct_path("CBT_ENABLE_DIRECTPATH", absl::nullopt);

auto opts = DefaultOptions();
EXPECT_EQ("google-c2p:///directpath-bigtable.googleapis.com",
EXPECT_EQ("c2p:///bigtable.googleapis.com",
opts.get<::google::cloud::bigtable_internal::DataEndpointOption>());
EXPECT_EQ("directpath-bigtable.googleapis.com", opts.get<AuthorityOption>());
EXPECT_EQ("bigtable.googleapis.com", opts.get<AuthorityOption>());
// Admin endpoints are not affected.
EXPECT_EQ(
"bigtableadmin.googleapis.com",
Expand All @@ -514,10 +515,31 @@ TEST(EndpointEnvTest, DirectPathEnabled) {
"bigtableadmin.googleapis.com",
opts.get<
::google::cloud::bigtable_internal::InstanceAdminEndpointOption>());
EXPECT_EQ(1, opts.get<GrpcNumChannelsOption>());
EXPECT_EQ(DefaultConnectionPoolSize(), opts.get<GrpcNumChannelsOption>());
}

TEST(EndpointEnvTest, DirectPathNoMatch) {
TEST(EndpointEnvTest, BigtableDirectPathEnabled) {
ScopedEnvironment emulator("BIGTABLE_EMULATOR_HOST", absl::nullopt);
ScopedEnvironment direct_path("GOOGLE_CLOUD_ENABLE_DIRECT_PATH",
absl::nullopt);
ScopedEnvironment cbt_direct_path("CBT_ENABLE_DIRECTPATH", "true");

auto opts = DefaultOptions();
EXPECT_EQ("c2p:///bigtable.googleapis.com",
opts.get<::google::cloud::bigtable_internal::DataEndpointOption>());
EXPECT_EQ("bigtable.googleapis.com", opts.get<AuthorityOption>());
// Admin endpoints are not affected.
EXPECT_EQ(
"bigtableadmin.googleapis.com",
opts.get<::google::cloud::bigtable_internal::AdminEndpointOption>());
EXPECT_EQ(
"bigtableadmin.googleapis.com",
opts.get<
::google::cloud::bigtable_internal::InstanceAdminEndpointOption>());
EXPECT_EQ(DefaultConnectionPoolSize(), opts.get<GrpcNumChannelsOption>());
}

TEST(EndpointEnvTest, CloudDirectPathNoMatch) {
ScopedEnvironment emulator("BIGTABLE_EMULATOR_HOST", absl::nullopt);
ScopedEnvironment direct_path("GOOGLE_CLOUD_ENABLE_DIRECT_PATH",
"bigtable-not,almost-bigtable");
Expand All @@ -527,25 +549,51 @@ TEST(EndpointEnvTest, DirectPathNoMatch) {
EXPECT_EQ("bigtable.googleapis.com", opts.get<AuthorityOption>());
}

TEST(EndpointEnvTest, DirectPathOverridesUserEndpoints) {
TEST(EndpointEnvTest, BigtableDirectPathFalse) {
ScopedEnvironment emulator("BIGTABLE_EMULATOR_HOST", absl::nullopt);
ScopedEnvironment cbt_direct_path("CBT_ENABLE_DIRECTPATH", "false");

auto opts = DefaultDataOptions(Options{});
EXPECT_EQ("bigtable.googleapis.com", opts.get<EndpointOption>());
EXPECT_EQ("bigtable.googleapis.com", opts.get<AuthorityOption>());
}

TEST(EndpointEnvTest, CloudDirectPathOverridesUserEndpoints) {
ScopedEnvironment emulator("BIGTABLE_EMULATOR_HOST", absl::nullopt);
ScopedEnvironment direct_path("GOOGLE_CLOUD_ENABLE_DIRECT_PATH", "bigtable");

auto opts = DefaultDataOptions(
Options{}.set<EndpointOption>("ignored").set<AuthorityOption>("ignored"));
EXPECT_EQ("google-c2p:///directpath-bigtable.googleapis.com",
opts.get<EndpointOption>());
EXPECT_EQ("directpath-bigtable.googleapis.com", opts.get<AuthorityOption>());
EXPECT_EQ("c2p:///bigtable.googleapis.com", opts.get<EndpointOption>());
EXPECT_EQ("bigtable.googleapis.com", opts.get<AuthorityOption>());
}

TEST(EndpointEnvTest, EmulatorOverridesDirectPath) {
TEST(EndpointEnvTest, BigtableDirectPathOverridesUserEndpoints) {
ScopedEnvironment emulator("BIGTABLE_EMULATOR_HOST", absl::nullopt);
ScopedEnvironment cbt_direct_path("CBT_ENABLE_DIRECTPATH", "true");

auto opts = DefaultDataOptions(
Options{}.set<EndpointOption>("ignored").set<AuthorityOption>("ignored"));
EXPECT_EQ("c2p:///bigtable.googleapis.com", opts.get<EndpointOption>());
EXPECT_EQ("bigtable.googleapis.com", opts.get<AuthorityOption>());
}

TEST(EndpointEnvTest, EmulatorOverridesCloudDirectPath) {
ScopedEnvironment emulator("BIGTABLE_EMULATOR_HOST", "emulator-host:8000");
ScopedEnvironment direct_path("GOOGLE_CLOUD_ENABLE_DIRECT_PATH", "bigtable");

auto opts = DefaultDataOptions(Options{});
EXPECT_EQ("emulator-host:8000", opts.get<EndpointOption>());
}

TEST(EndpointEnvTest, EmulatorOverridesBigtableDirectPath) {
ScopedEnvironment emulator("BIGTABLE_EMULATOR_HOST", "emulator-host:8000");
ScopedEnvironment cbt_direct_path("CBT_ENABLE_DIRECTPATH", "true");

auto opts = DefaultDataOptions(Options{});
EXPECT_EQ("emulator-host:8000", opts.get<EndpointOption>());
}

TEST(ConnectionRefreshRange, BothUnset) {
ScopedEnvironment emulator("BIGTABLE_EMULATOR_HOST", absl::nullopt);
auto opts = DefaultOptions();
Expand Down
Loading