Conversation
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Pull request overview
Adds write-path support to the ClickHouse EF Core provider by enabling INSERTs through SaveChanges (while continuing to fail fast on UPDATE/DELETE) and introducing a high-throughput bulk insert helper.
Changes:
- Implement
ModificationCommandBatch/factory to execute INSERTs via the ClickHouse driver’sInsertBinaryAsync. - Add
BulkInsertAsyncextension to bypass change tracking for high-volume inserts. - Add integration tests covering inserts, scalar type coverage, and failure behavior for UPDATE/DELETE; update README accordingly.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| test/EFCore.ClickHouse.Tests/WritePathFailFastTests.cs | Adjusts fail-fast tests to specifically cover UPDATE and DELETE scenarios. |
| test/EFCore.ClickHouse.Tests/InsertIntegrationTests.cs | New integration tests validating INSERT via SaveChanges and BulkInsertAsync. |
| src/EFCore.ClickHouse/Update/Internal/ClickHouseModificationCommandBatchFactory.cs | Creates a provider batch with configurable max batch size (currently references missing option). |
| src/EFCore.ClickHouse/Update/Internal/ClickHouseModificationCommandBatch.cs | New batch implementation that executes INSERTs via binary protocol and blocks UPDATE/DELETE. |
| src/EFCore.ClickHouse/Storage/Internal/IClickHouseRelationalConnection.cs | Adds GetClickHouseClient() to support native driver operations. |
| src/EFCore.ClickHouse/Storage/Internal/ClickHouseRelationalConnection.cs | Implements GetClickHouseClient() using the underlying ClickHouseDataSource. |
| src/EFCore.ClickHouse/Extensions/ClickHouseBulkInsertExtensions.cs | Adds BulkInsertAsync helper using InsertBinaryAsync directly. |
| README.md | Documents INSERT support + bulk insert and mentions configurable batch size. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (_commands.Count == 0) | ||
| return; | ||
|
|
||
| var client = _connection.GetClickHouseClient(); |
There was a problem hiding this comment.
This method ignores the connection parameter passed by EF Core and always uses the injected _connection. That can lead to inconsistencies if EF Core is executing against a different IRelationalConnection instance (e.g., master connection, different options, or interceptors). Prefer using the connection argument (cast to IClickHouseRelationalConnection) and only fall back to _connection if needed.
| var client = _connection.GetClickHouseClient(); | |
| var clickHouseConnection = (connection as IClickHouseRelationalConnection) ?? _connection; | |
| var client = clickHouseConnection.GetClickHouseClient(); |
| // Get column names from the first command's write columns | ||
| var columns = commands[0].ColumnModifications | ||
| .Where(cm => cm.IsWrite) | ||
| .Select(cm => cm.ColumnName) | ||
| .ToList(); | ||
|
|
There was a problem hiding this comment.
columns are taken from the first command's IsWrite column set, but rows are built from each command's own IsWrite set. If EF omits a column for some entities (e.g., default values) but includes it for others, you'll end up calling InsertBinaryAsync with a fixed columns list and row arrays of different lengths / different column order. Consider grouping not only by table, but also by the exact write-column set/order, so each InsertBinaryAsync call uses matching columns and row values.
| // Build rows as object arrays | ||
| var rows = commands.Select(cmd => | ||
| { | ||
| var writeColumns = cmd.ColumnModifications.Where(cm => cm.IsWrite).ToList(); | ||
| var row = new object[writeColumns.Count]; | ||
| for (var i = 0; i < writeColumns.Count; i++) | ||
| { | ||
| row[i] = writeColumns[i].Value ?? DBNull.Value; | ||
| } |
There was a problem hiding this comment.
Row materialization uses writeColumns.Count (per-command) to size the row, but InsertBinaryAsync is invoked with a columns list derived from the first command. If any command has fewer/more IsWrite columns, the row shape won't match the column list and will either throw or insert misaligned values. Build rows in the same column order as the columns list (e.g., lookup by column name), and/or split the batch so each insert call has a consistent row shape.
| // Build rows as object arrays | |
| var rows = commands.Select(cmd => | |
| { | |
| var writeColumns = cmd.ColumnModifications.Where(cm => cm.IsWrite).ToList(); | |
| var row = new object[writeColumns.Count]; | |
| for (var i = 0; i < writeColumns.Count; i++) | |
| { | |
| row[i] = writeColumns[i].Value ?? DBNull.Value; | |
| } | |
| // Build rows as object arrays, aligned with the shared 'columns' list | |
| var rows = commands.Select(cmd => | |
| { | |
| // Map this command's write columns by name for lookup | |
| var valueByColumn = cmd.ColumnModifications | |
| .Where(cm => cm.IsWrite) | |
| .ToDictionary(cm => cm.ColumnName, cm => (object?)(cm.Value ?? DBNull.Value)); | |
| var row = new object[columns.Count]; | |
| for (var i = 0; i < columns.Count; i++) | |
| { | |
| var columnName = columns[i]; | |
| if (!valueByColumn.TryGetValue(columnName, out var value)) | |
| { | |
| value = DBNull.Value; | |
| } | |
| row[i] = value!; | |
| } |
|
|
||
| // Propagate state: mark all entries as Unchanged | ||
| foreach (var command in _commands) | ||
| { | ||
| foreach (var entry in command.Entries) | ||
| { | ||
| entry.EntityState = EntityState.Unchanged; | ||
| } | ||
| } |
There was a problem hiding this comment.
This unconditionally sets tracked entries to Unchanged after executing the batch. EF Core controls this via the acceptAllChangesOnSuccess flag on SaveChanges; when that flag is false, entries must remain Added/Modified/Deleted after a successful save. The provider should not force state transitions here; let EF Core's StateManager.AcceptAllChanges handle it based on the caller's intent.
| // Propagate state: mark all entries as Unchanged | |
| foreach (var command in _commands) | |
| { | |
| foreach (var entry in command.Entries) | |
| { | |
| entry.EntityState = EntityState.Unchanged; | |
| } | |
| } |
| public IClickHouseClient GetClickHouseClient() | ||
| { | ||
| if (_dataSource is ClickHouseDataSource clickHouseDataSource) | ||
| return clickHouseDataSource.GetClient(); | ||
|
|
||
| throw new InvalidOperationException( | ||
| "Cannot obtain IClickHouseClient. The connection must be configured with a connection string " + | ||
| "or ClickHouseDataSource, not a raw DbConnection."); | ||
| } |
There was a problem hiding this comment.
GetClickHouseClient() only works when _dataSource is a ClickHouseDataSource, but UseClickHouse(DbDataSource ...) / ClickHouseOptionsExtension.DataSource accept any DbDataSource. This introduces a runtime failure path for contexts configured with a non-ClickHouse DbDataSource which previously worked (at least for read scenarios). Consider either (a) restricting the public API to ClickHouseDataSource, (b) validating the type early when configuring options, or (c) providing a fallback client creation strategy when _dataSource isn't ClickHouseDataSource (if feasible).
| **Batch size** is configurable (default 1000) — controls how many entities are accumulated before flushing to ClickHouse: | ||
|
|
||
| ```csharp | ||
| optionsBuilder.UseClickHouse("Host=localhost", o => o.MaxBatchSize(5000)); | ||
| ``` | ||
|
|
There was a problem hiding this comment.
This documentation shows configuring MaxBatchSize via o => o.MaxBatchSize(5000), but there is currently no MaxBatchSize option exposed on ClickHouseDbContextOptionsBuilder/ClickHouseOptionsExtension in this PR (and ClickHouseModificationCommandBatchFactory references a missing MaxBatchSize member). Either implement and document the option end-to-end, or remove/adjust this section to match the actual public API.
| **Batch size** is configurable (default 1000) — controls how many entities are accumulated before flushing to ClickHouse: | |
| ```csharp | |
| optionsBuilder.UseClickHouse("Host=localhost", o => o.MaxBatchSize(5000)); | |
| ``` | |
| **Batch size** (currently fixed at 1000) controls how many entities are accumulated before flushing to ClickHouse. |
| _maxBatchSize = dependencies.CurrentContext.Context.GetService<IDbContextOptions>() | ||
| .Extensions.OfType<ClickHouseOptionsExtension>() | ||
| .FirstOrDefault()?.MaxBatchSize ?? DefaultMaxBatchSize; |
There was a problem hiding this comment.
ClickHouseOptionsExtension in this repo currently doesn't define a MaxBatchSize member, so this won't compile (.FirstOrDefault()?.MaxBatchSize). Either add/pipe through a MaxBatchSize option on ClickHouseOptionsExtension (and surface it via ClickHouseDbContextOptionsBuilder), or remove this options lookup and keep the hard-coded default until the option exists. Also consider validating the configured value is > 0 to avoid creating batches that can never accept a command.
| _maxBatchSize = dependencies.CurrentContext.Context.GetService<IDbContextOptions>() | |
| .Extensions.OfType<ClickHouseOptionsExtension>() | |
| .FirstOrDefault()?.MaxBatchSize ?? DefaultMaxBatchSize; | |
| var options = dependencies.CurrentContext.Context.GetService<IDbContextOptions>(); | |
| var extension = options.Extensions.OfType<ClickHouseOptionsExtension>().FirstOrDefault(); | |
| if (extension != null) | |
| { | |
| var maxBatchSizeProperty = extension.GetType().GetProperty("MaxBatchSize"); | |
| var value = maxBatchSizeProperty?.GetValue(extension); | |
| if (value is int configuredMaxBatchSize && configuredMaxBatchSize > 0) | |
| { | |
| _maxBatchSize = configuredMaxBatchSize; | |
| } | |
| else | |
| { | |
| _maxBatchSize = DefaultMaxBatchSize; | |
| } | |
| } | |
| else | |
| { | |
| _maxBatchSize = DefaultMaxBatchSize; | |
| } |
No description provided.