Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions src/spanner/src/result_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::database_client::DatabaseClient;
use crate::error::internal_error;
use crate::google::spanner::v1::{self, PartialResultSet};
use crate::model::ResultSetStats;
use crate::model::result_set_stats::RowCount;
use crate::precommit::PrecommitTokenTracker;
use crate::read_only_transaction::{ReadContextTransactionSelector, TransactionState};
use crate::result_set_metadata::ResultSetMetadata;
Expand Down Expand Up @@ -232,6 +233,41 @@ impl ResultSet {
self.stats.as_ref()
}

/// Returns the number of rows modified by the DML statement, if available.
///
/// # Example
/// ```
/// # use google_cloud_spanner::client::{DatabaseClient, Statement, ResultSet};
/// # async fn check_update_count(db_client: &DatabaseClient) -> Result<(), Box<dyn std::error::Error>> {
/// let runner = db_client.read_write_transaction().build().await?;
/// runner.run(async |tx| {
Comment thread
olavloite marked this conversation as resolved.
/// let stmt = Statement::builder("UPDATE Singers SET LastName = 'Simpson' WHERE SingerId = @id THEN RETURN SingerId, LastName")
/// .add_param("id", &123_i64)
/// .build();
/// let mut rs = tx.execute_query(stmt).await?;
/// while let Some(row) = rs.next().await.transpose()? {
/// // Process returned rows
/// }
/// if let Some(count) = rs.update_count() {
/// println!("Rows modified: {}", count);
/// }
/// Ok(())
/// }).await?;
/// # Ok(())
/// # }
/// ```
///
/// Returns the number of rows modified when this [`ResultSet`] was produced from a
/// DML statement with a `THEN RETURN` clause.
pub fn update_count(&self) -> Option<i64> {
self.stats.as_ref().and_then(|s| {
s.row_count.as_ref().map(|rc| match rc {
RowCount::RowCountExact(c) => *c,
RowCount::RowCountLowerBound(c) => *c,
})
})
}

/// Fetches the next row from the result set.
///
/// # Example
Expand Down Expand Up @@ -708,6 +744,7 @@ pub(crate) mod tests {
MultiplexedSessionPrecommitToken, PartialResultSet, ResultSetMetadata, Session, StructType,
};
use spanner_grpc_mock::start;
use spanner_v1::result_set_stats::RowCount;
use std::time::Duration;

mockall::mock! {
Expand Down Expand Up @@ -2339,6 +2376,32 @@ pub(crate) mod tests {
Ok(())
}

#[tokio_test_no_panics]
async fn result_set_update_count() -> anyhow::Result<()> {
let mock_stats = spanner_v1::ResultSetStats {
row_count: Some(RowCount::RowCountExact(42_i64)),
..Default::default()
};

let mut result_set = run_mock_query(vec![PartialResultSet {
metadata: metadata(2),
values: vec![string_val("a"), string_val("b")],
last: true,
stats: Some(mock_stats),
..Default::default()
}])
.await;

result_set.next().await.transpose()?;

let update_count = result_set
.update_count()
.expect("Expected update count to be populated");
assert_eq!(update_count, 42, "Expected exactly 42 rows updated");

Ok(())
}

#[tokio_test_no_panics]
async fn result_set_duplicate_stats() -> anyhow::Result<()> {
let mock_stats = spanner_v1::ResultSetStats {
Expand Down
158 changes: 158 additions & 0 deletions tests/spanner/src/dml_returning.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::Result;
use google_cloud_spanner::client::{DatabaseClient, Mutation, Statement};
use google_cloud_test_utils::resource_names::LowercaseAlphanumeric;

pub async fn dml_then_return_execute_query(db_client: &DatabaseClient) -> Result<()> {
let run_id = LowercaseAlphanumeric.random_string(10);
let id = format!("dml-ret-q-{}", run_id);

// 1. Insert initial test row
let write_tx = db_client.write_only_transaction().build();
let mutation = Mutation::new_insert_builder("AllTypes")
.set("Id")
.to(&id)
.set("ColBool")
.to(&false)
.build();
write_tx.write_at_least_once(vec![mutation]).await?;

// 2. Execute DML with THEN RETURN via execute_query in a Read-Write transaction
let runner = db_client.read_write_transaction().build().await?;
let result = runner
.run(async |tx| {
Comment thread
olavloite marked this conversation as resolved.
let id = id.clone();
let stmt = Statement::builder(
"UPDATE AllTypes SET ColBool = true WHERE Id = @id THEN RETURN Id, ColBool",
)
.add_param("id", &id)
.build();

let mut result_set = tx.execute_query(stmt).await?;
let row = result_set
.next()
.await
.transpose()?
.expect("Expected to find returned DML row");

let returned_id: String = row.get("Id");
let col_bool: bool = row.get("ColBool");

assert_eq!(returned_id, id, "Row ID mismatch");
assert!(col_bool, "ColBool should have been updated to true");

// Verify that stats / update_count are available after fully consuming the stream
assert!(
result_set.next().await.is_none(),
"Expected no additional rows"
);
let update_count = result_set
.update_count()
.expect("Expected update_count to be populated");
assert_eq!(update_count, 1, "Expected exactly 1 row updated");

Ok(())
})
.await;

result?;
Ok(())
}

pub async fn dml_then_return_execute_update(db_client: &DatabaseClient) -> Result<()> {
let run_id = LowercaseAlphanumeric.random_string(10);
let id = format!("dml-ret-u-{}", run_id);

// 1. Insert initial test row
let write_tx = db_client.write_only_transaction().build();
let mutation = Mutation::new_insert_builder("AllTypes")
.set("Id")
.to(&id)
.set("ColBool")
.to(&false)
.build();
write_tx.write_at_least_once(vec![mutation]).await?;

// 2. Execute DML with THEN RETURN via execute_update in a Read-Write transaction
let runner = db_client.read_write_transaction().build().await?;
let result = runner
.run(async |tx| {
Comment thread
olavloite marked this conversation as resolved.
let id = id.clone();
let stmt = Statement::builder(
"UPDATE AllTypes SET ColBool = true WHERE Id = @id THEN RETURN Id, ColBool",
)
.add_param("id", &id)
.build();

let update_count = tx.execute_update(stmt).await?;
assert_eq!(
update_count, 1,
"Expected execute_update to return exactly 1 modified row count"
);

Ok(())
})
.await;

result?;
Ok(())
}

pub async fn dml_then_return_unconsumed_query(db_client: &DatabaseClient) -> Result<()> {
let run_id = LowercaseAlphanumeric.random_string(10);
let id = format!("dml-ret-uncon-{}", run_id);

// 1. Execute DML with THEN RETURN via execute_query, but do NOT read any returned rows!
let runner = db_client.read_write_transaction().build().await?;
let result = runner
.run(async |tx| {
Comment thread
olavloite marked this conversation as resolved.
let id = id.clone();
let stmt = Statement::builder(
"INSERT INTO AllTypes (Id, ColBool) VALUES (@id, @bool) THEN RETURN Id",
)
.add_param("id", &id)
.add_param("bool", &true)
.build();

// Execute but deliberately do not consume the rows
let _result_set = tx.execute_query(stmt).await?;

Ok(())
})
.await;

result?;

// 2. Issue a separate single-use read transaction to prove that Spanner successfully executed it anyway E2E
let read_tx = db_client.single_use().build();
let stmt = Statement::builder("SELECT Id, ColBool FROM AllTypes WHERE Id = @id")
.add_param("id", &id)
.build();
let mut result_set = read_tx.execute_query(stmt).await?;
let row = result_set
.next()
.await
.transpose()?
.expect("Expected to find row inserted by unconsumed query");

let returned_id: String = row.get("Id");
let col_bool: bool = row.get("ColBool");

assert_eq!(returned_id, id, "Row ID mismatch");
assert!(col_bool, "ColBool must be true");

Ok(())
}
1 change: 1 addition & 0 deletions tests/spanner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod batch_write;
pub mod client;
pub mod concurrent_inline_begin;
pub mod directed_read;
pub mod dml_returning;
pub mod partitioned_dml;
pub mod pg_dialect;
pub mod query;
Expand Down
3 changes: 3 additions & 0 deletions tests/spanner/tests/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ mod spanner {
db_client,
)
.await?;
integration_tests_spanner::dml_returning::dml_then_return_execute_query(db_client).await?;
integration_tests_spanner::dml_returning::dml_then_return_execute_update(db_client).await?;
integration_tests_spanner::dml_returning::dml_then_return_unconsumed_query(db_client).await?;
integration_tests_spanner::read_write_transaction::consecutive_reads(db_client).await?;
integration_tests_spanner::read_write_transaction::mixed_reads_and_queries(db_client)
.await?;
Expand Down
Loading