Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.EnableOltpSink = serviceConfig.GetEnableOltpSink();
kqpConfig.EnableHtapTx = serviceConfig.GetEnableHtapTx();
kqpConfig.EnableStreamWrite = serviceConfig.GetEnableStreamWrite();
kqpConfig.EnableBatchUpdates = serviceConfig.GetEnableBatchUpdates();
kqpConfig.BlockChannelsMode = serviceConfig.GetBlockChannelsMode();
kqpConfig.IdxLookupJoinsPrefixPointLimit = serviceConfig.GetIdxLookupJoinPointsLimit();
kqpConfig.DefaultCostBasedOptimizationLevel = serviceConfig.GetDefaultCostBasedOptimizationLevel();
Expand Down
16 changes: 14 additions & 2 deletions ydb/core/kqp/opt/kqp_opt_kql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -964,12 +964,18 @@ TExprNode::TPtr HandleWriteTable(const TKiWriteTable& write, TExprContext& ctx,
TExprNode::TPtr HandleUpdateTable(const TKiUpdateTable& update, TExprContext& ctx, TKqpOptimizeContext& kqpCtx,
const TKikimrTablesData& tablesData, bool withSystemColumns)
{
Y_UNUSED(kqpCtx);
const auto& tableData = GetTableData(tablesData, update.DataSink().Cluster(), update.Table().Value());
if (!CheckWriteToIndex(update, tableData, ctx) || !CheckDisabledWriteToUniqIndex(update, tableData, ctx)) {
return nullptr;
}

const bool allowBatchUpdates = kqpCtx.Config->EnableBatchUpdates && kqpCtx.Config->EnableOltpSink;
if (!allowBatchUpdates && update.IsBatch() == "true") {
const TString err = "BATCH operations are not supported at the current time.";
ctx.AddError(YqlIssue(ctx.GetPosition(update.Pos()), TIssuesIds::KIKIMR_PRECONDITION_FAILED, err));
return nullptr;
}

if (HasIndexesToWrite(tableData)) {
return BuildUpdateTableWithIndex(update, tableData, withSystemColumns, ctx).Ptr();
} else {
Expand All @@ -980,12 +986,18 @@ TExprNode::TPtr HandleUpdateTable(const TKiUpdateTable& update, TExprContext& ct
TExprNode::TPtr HandleDeleteTable(const TKiDeleteTable& del, TExprContext& ctx, TKqpOptimizeContext& kqpCtx,
const TKikimrTablesData& tablesData, bool withSystemColumns)
{
Y_UNUSED(kqpCtx);
auto& tableData = GetTableData(tablesData, del.DataSink().Cluster(), del.Table().Value());
if (!CheckWriteToIndex(del, tableData, ctx) || !CheckDisabledWriteToUniqIndex(del, tableData, ctx)) {
return nullptr;
}

const bool allowBatchUpdates = kqpCtx.Config->EnableBatchUpdates && kqpCtx.Config->EnableOltpSink;
if (!allowBatchUpdates && del.IsBatch() == "true") {
const TString err = "BATCH operations are not supported at the current time.";
ctx.AddError(YqlIssue(ctx.GetPosition(del.Pos()), TIssuesIds::KIKIMR_PRECONDITION_FAILED, err));
return nullptr;
}

if (HasIndexesToWrite(tableData)) {
return BuildDeleteTableWithIndex(del, tableData, withSystemColumns, ctx).Ptr();
} else {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/provider/yql_kikimr_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
bool EnableOltpSink = false;
bool EnableHtapTx = false;
bool EnableStreamWrite = false;
bool EnableBatchUpdates = false;
NKikimrConfig::TTableServiceConfig_EBlockChannelsMode BlockChannelsMode;
bool EnableSpilling = true;
ui32 DefaultCostBasedOptimizationLevel = 4;
Expand Down
14 changes: 9 additions & 5 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1394,7 +1394,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
return;
}

if (QueryState->TxCtx->EnableOltpSink.value_or(false) && isBatchQuery && (!tx || !tx->IsLiteralTx())) {
if (isBatchQuery && (!tx || !tx->IsLiteralTx())) {
ExecutePartitioned(tx);
} else if (QueryState->TxCtx->ShouldExecuteDeferredEffects(tx)) {
ExecuteDeferredEffectsImmediately(tx);
Expand All @@ -1408,17 +1408,21 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
void ExecutePartitioned(const TKqpPhyTxHolder::TConstPtr& tx) {
if (!Settings.TableService.GetEnableBatchUpdates()) {
return ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
"BATCH operations are disabled by EnableBatchUpdates flag.");
"BATCH operations are not supported at the current time.");
}

if (!QueryState->TxCtx->EnableOltpSink.value_or(false)) {
return ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
"BATCH operations are not supported at the current time.");
}

if (QueryState->TxCtx->HasOlapTable) {
return ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
"BATCH operations are not supported for column tables at the current time.");
}

if (QueryState->HasTxControl()) {
NYql::TIssues issues;
return ReplyQueryError(::Ydb::StatusIds::StatusCode::StatusIds_StatusCode_BAD_REQUEST,
if (!QueryState->HasImplicitTx()) {
return ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
"BATCH operation can be executed only in the implicit transaction mode.");
}

Expand Down
29 changes: 25 additions & 4 deletions ydb/core/kqp/ut/batch_operations/kqp_batch_delete_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ using namespace NYdb::NQuery;

namespace {

NKikimrConfig::TAppConfig GetAppConfig(size_t maxBatchSize = 10000, size_t partitionLimit = 10) {
NKikimrConfig::TAppConfig GetAppConfig(size_t maxBatchSize = 10000, size_t partitionLimit = 10, bool enableOltpSink = true, bool enableBatchUpdates = true) {
auto app = NKikimrConfig::TAppConfig();
app.MutableTableServiceConfig()->SetEnableOlapSink(true);
app.MutableTableServiceConfig()->SetEnableOltpSink(true);
app.MutableTableServiceConfig()->SetEnableBatchUpdates(true);
app.MutableTableServiceConfig()->SetEnableOltpSink(enableOltpSink);
app.MutableTableServiceConfig()->SetEnableBatchUpdates(enableBatchUpdates);
app.MutableTableServiceConfig()->MutableBatchOperationSettings()->SetMaxBatchSize(maxBatchSize);
app.MutableTableServiceConfig()->MutableBatchOperationSettings()->SetPartitionExecutionLimit(partitionLimit);
return app;
Expand Down Expand Up @@ -578,10 +578,31 @@ Y_UNIT_TEST_SUITE(KqpBatchDelete) {
)");

auto result = session.ExecuteQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST);
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::PRECONDITION_FAILED);
UNIT_ASSERT_STRING_CONTAINS_C(result.GetIssues().ToString(), "BATCH operation can be executed only in the implicit transaction mode.", result.GetIssues().ToString());
}
}

Y_UNIT_TEST_QUAD(DisableFlags, UseSink, UseBatchUpdates) {
TKikimrRunner kikimr(GetAppConfig(10000, 10, UseSink, UseBatchUpdates));
auto db = kikimr.GetQueryClient();
auto session = db.GetSession().GetValueSync().GetSession();

{
auto query = Q_(R"(
BATCH DELETE FROM KeyValue
WHERE Key >= 3;
)");

auto result = session.ExecuteQuery(query, TTxControl::NoTx()).ExtractValueSync();
if (UseSink && UseBatchUpdates) {
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
} else {
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::PRECONDITION_FAILED);
UNIT_ASSERT_STRING_CONTAINS_C(result.GetIssues().ToString(), "BATCH operations are not supported at the current time.", result.GetIssues().ToString());
}
}
}
}

} // namespace NKqp
Expand Down
30 changes: 26 additions & 4 deletions ydb/core/kqp/ut/batch_operations/kqp_batch_update_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ using namespace NYdb::NQuery;

namespace {

NKikimrConfig::TAppConfig GetAppConfig(size_t maxBatchSize = 10000, size_t partitionLimit = 10) {
NKikimrConfig::TAppConfig GetAppConfig(size_t maxBatchSize = 10000, size_t partitionLimit = 10, bool enableOltpSink = true, bool enableBatchUpdates = true) {
auto app = NKikimrConfig::TAppConfig();
app.MutableTableServiceConfig()->SetEnableOlapSink(true);
app.MutableTableServiceConfig()->SetEnableOltpSink(true);
app.MutableTableServiceConfig()->SetEnableBatchUpdates(true);
app.MutableTableServiceConfig()->SetEnableOltpSink(enableOltpSink);
app.MutableTableServiceConfig()->SetEnableBatchUpdates(enableBatchUpdates);
app.MutableTableServiceConfig()->MutableBatchOperationSettings()->SetMaxBatchSize(maxBatchSize);
app.MutableTableServiceConfig()->MutableBatchOperationSettings()->SetPartitionExecutionLimit(partitionLimit);
return app;
Expand Down Expand Up @@ -717,10 +717,32 @@ Y_UNIT_TEST_SUITE(KqpBatchUpdate) {
)");

auto result = session.ExecuteQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST);
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::PRECONDITION_FAILED);
UNIT_ASSERT_STRING_CONTAINS_C(result.GetIssues().ToString(), "BATCH operation can be executed only in the implicit transaction mode.", result.GetIssues().ToString());
}
}

Y_UNIT_TEST_QUAD(DisableFlags, UseSink, UseBatchUpdates) {
TKikimrRunner kikimr(GetAppConfig(10000, 10, UseSink, UseBatchUpdates));
auto db = kikimr.GetQueryClient();
auto session = db.GetSession().GetValueSync().GetSession();

{
auto query = Q_(R"(
BATCH UPDATE KeyValue
SET Value = "None"
WHERE Key IN [1, 3, 5];
)");

auto result = session.ExecuteQuery(query, TTxControl::NoTx()).ExtractValueSync();
if (UseSink && UseBatchUpdates) {
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
} else {
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::PRECONDITION_FAILED);
UNIT_ASSERT_STRING_CONTAINS_C(result.GetIssues().ToString(), "BATCH operations are not supported at the current time.", result.GetIssues().ToString());
}
}
}
}

} // namespace NKqp
Expand Down
Loading