-
Notifications
You must be signed in to change notification settings - Fork 0
add insert support #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| using ClickHouse.EntityFrameworkCore.Storage.Internal; | ||
| using Microsoft.EntityFrameworkCore; | ||
| using Microsoft.EntityFrameworkCore.Infrastructure; | ||
| using Microsoft.EntityFrameworkCore.Metadata; | ||
|
|
||
| namespace ClickHouse.EntityFrameworkCore.Extensions; | ||
|
|
||
| public static class ClickHouseBulkInsertExtensions | ||
| { | ||
| /// <summary> | ||
| /// Inserts entities into ClickHouse using the driver's native binary insert protocol. | ||
| /// This bypasses EF Core change tracking entirely and is intended for high-throughput bulk loads. | ||
| /// Entities are NOT tracked or marked as Unchanged after insert. | ||
| /// </summary> | ||
| public static async Task<long> BulkInsertAsync<TEntity>( | ||
| this DbContext context, | ||
| IEnumerable<TEntity> entities, | ||
| CancellationToken cancellationToken = default) where TEntity : class | ||
| { | ||
| var connection = context.GetService<IClickHouseRelationalConnection>(); | ||
| var client = connection.GetClickHouseClient(); | ||
|
|
||
| var entityType = context.Model.FindEntityType(typeof(TEntity)) | ||
| ?? throw new InvalidOperationException( | ||
| $"The entity type '{typeof(TEntity).Name}' is not part of the model for the current context."); | ||
|
|
||
| var tableName = entityType.GetTableName() | ||
| ?? throw new InvalidOperationException( | ||
| $"The entity type '{typeof(TEntity).Name}' is not mapped to a table."); | ||
|
|
||
| // Build column list and property accessors | ||
| var properties = entityType.GetProperties() | ||
| .Where(p => p.GetTableColumnMappings().Any()) | ||
| .ToList(); | ||
|
|
||
| var columns = properties | ||
| .Select(p => p.GetTableColumnMappings().First().Column.Name) | ||
| .ToList(); | ||
|
|
||
| var accessors = properties | ||
| .Select(p => p.GetGetter()) | ||
| .ToList(); | ||
|
|
||
| // Convert entities to row arrays | ||
| // TODO quite inefficient, update this after adding direct POCO insert to client API | ||
| var rows = entities.Select(entity => | ||
| { | ||
| var row = new object[accessors.Count]; | ||
| for (var i = 0; i < accessors.Count; i++) | ||
| { | ||
| row[i] = accessors[i].GetClrValue(entity) ?? DBNull.Value; | ||
| } | ||
| return row; | ||
| }); | ||
|
|
||
| return await client.InsertBinaryAsync(tableName, columns, rows, cancellationToken: cancellationToken); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| using System.Data; | ||
| using System.Data.Common; | ||
| using ClickHouse.Driver; | ||
| using ClickHouse.Driver.ADO; | ||
| using ClickHouse.EntityFrameworkCore.Infrastructure.Internal; | ||
| using Microsoft.EntityFrameworkCore; | ||
|
|
@@ -69,4 +70,14 @@ public override Task<IDbContextTransaction> BeginTransactionAsync( | |
| IsolationLevel isolationLevel, | ||
| CancellationToken cancellationToken = default) | ||
| => Task.FromResult<IDbContextTransaction>(new ClickHouseTransaction()); | ||
|
|
||
| public IClickHouseClient GetClickHouseClient() | ||
| { | ||
| if (_dataSource is ClickHouseDataSource clickHouseDataSource) | ||
| return clickHouseDataSource.GetClient(); | ||
|
|
||
| throw new InvalidOperationException( | ||
| "Cannot obtain IClickHouseClient. The connection must be configured with a connection string " + | ||
| "or ClickHouseDataSource, not a raw DbConnection."); | ||
| } | ||
|
Comment on lines
+74
to
+82
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,10 @@ | ||
| using ClickHouse.Driver; | ||
| using Microsoft.EntityFrameworkCore.Storage; | ||
|
|
||
| namespace ClickHouse.EntityFrameworkCore.Storage.Internal; | ||
|
|
||
| public interface IClickHouseRelationalConnection : IRelationalConnection | ||
| { | ||
| IClickHouseRelationalConnection CreateMasterConnection(); | ||
| IClickHouseClient GetClickHouseClient(); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,112 @@ | ||
| using ClickHouse.EntityFrameworkCore.Storage.Internal; | ||
| using Microsoft.EntityFrameworkCore; | ||
| using Microsoft.EntityFrameworkCore.Storage; | ||
| using Microsoft.EntityFrameworkCore.Update; | ||
|
|
||
| namespace ClickHouse.EntityFrameworkCore.Update.Internal; | ||
|
|
||
| public class ClickHouseModificationCommandBatch : ModificationCommandBatch | ||
| { | ||
| private readonly List<IReadOnlyModificationCommand> _commands = []; | ||
| private readonly int _maxBatchSize; | ||
| private bool _completed; | ||
| private bool _moreExpected; | ||
|
|
||
| public ClickHouseModificationCommandBatch(int maxBatchSize) | ||
| { | ||
| _maxBatchSize = maxBatchSize; | ||
| } | ||
|
|
||
| public override IReadOnlyList<IReadOnlyModificationCommand> ModificationCommands => _commands; | ||
|
|
||
| public override bool RequiresTransaction => false; | ||
|
|
||
| public override bool AreMoreBatchesExpected => _moreExpected; | ||
|
|
||
| public override bool TryAddCommand(IReadOnlyModificationCommand modificationCommand) | ||
| { | ||
| if (_completed) | ||
| throw new InvalidOperationException("Batch has already been completed."); | ||
|
|
||
| if (modificationCommand.EntityState is EntityState.Modified) | ||
| throw new NotSupportedException( | ||
| "UPDATE operations are not supported by the ClickHouse EF Core provider. " + | ||
| "ClickHouse mutations (ALTER TABLE ... UPDATE) are asynchronous and not OLTP-compatible."); | ||
|
|
||
| if (modificationCommand.EntityState is EntityState.Deleted) | ||
| throw new NotSupportedException( | ||
| "DELETE operations are not supported by the ClickHouse EF Core provider. " + | ||
| "ClickHouse mutations (ALTER TABLE ... DELETE) are asynchronous and not OLTP-compatible."); | ||
|
|
||
| if (modificationCommand.EntityState is not EntityState.Added) | ||
| throw new NotSupportedException( | ||
| $"Unexpected entity state '{modificationCommand.EntityState}'. " + | ||
| "The ClickHouse EF Core provider only supports INSERT (EntityState.Added)."); | ||
|
|
||
| // Block server-generated values (ClickHouse has no RETURNING / auto-increment) | ||
| foreach (var columnMod in modificationCommand.ColumnModifications) | ||
| { | ||
| if (columnMod.IsRead) | ||
| throw new NotSupportedException( | ||
| $"Server-generated values are not supported by the ClickHouse EF Core provider. " + | ||
| $"Column '{columnMod.ColumnName}' on table '{modificationCommand.TableName}' is configured " + | ||
| $"to read a value back from the database after INSERT. Remove ValueGeneratedOnAdd() or " + | ||
| $"use HasValueGenerator() with a client-side generator instead."); | ||
| } | ||
|
|
||
| if (_commands.Count >= _maxBatchSize) | ||
| return false; | ||
|
|
||
| _commands.Add(modificationCommand); | ||
| return true; | ||
| } | ||
|
|
||
| public override void Complete(bool moreBatchesExpected) | ||
| { | ||
| _completed = true; | ||
| _moreExpected = moreBatchesExpected; | ||
| } | ||
|
|
||
| public override void Execute(IRelationalConnection connection) | ||
| => ExecuteAsync(connection).GetAwaiter().GetResult(); | ||
|
|
||
| public override async Task ExecuteAsync( | ||
| IRelationalConnection connection, | ||
| CancellationToken cancellationToken = default) | ||
| { | ||
| if (_commands.Count == 0) | ||
| return; | ||
|
|
||
| var clickHouseConnection = (IClickHouseRelationalConnection)connection; | ||
| var client = clickHouseConnection.GetClickHouseClient(); | ||
|
|
||
| // Group commands by table name and write-column set for correct row alignment | ||
| var groups = _commands.GroupBy(c => ( | ||
| c.TableName, | ||
| Columns: string.Join(",", c.ColumnModifications.Where(cm => cm.IsWrite).Select(cm => cm.ColumnName)))); | ||
|
|
||
| foreach (var group in groups) | ||
| { | ||
| var tableName = group.Key.TableName; | ||
| var commands = group.ToList(); | ||
|
|
||
| var columns = commands[0].ColumnModifications | ||
| .Where(cm => cm.IsWrite) | ||
| .Select(cm => cm.ColumnName) | ||
| .ToList(); | ||
|
|
||
| var rows = commands.Select(cmd => | ||
| { | ||
| var writeColumns = cmd.ColumnModifications.Where(cm => cm.IsWrite).ToList(); | ||
| var row = new object[writeColumns.Count]; | ||
| for (var i = 0; i < writeColumns.Count; i++) | ||
| { | ||
| row[i] = writeColumns[i].Value ?? DBNull.Value; | ||
| } | ||
| return row; | ||
| }); | ||
|
|
||
| await client.InsertBinaryAsync(tableName, columns, rows, cancellationToken: cancellationToken); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,15 +1,22 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||
| using ClickHouse.EntityFrameworkCore.Infrastructure.Internal; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| using Microsoft.EntityFrameworkCore.Infrastructure; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| using Microsoft.EntityFrameworkCore.Update; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| namespace ClickHouse.EntityFrameworkCore.Update.Internal; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| public class ClickHouseModificationCommandBatchFactory : IModificationCommandBatchFactory | ||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| public ClickHouseModificationCommandBatchFactory(ModificationCommandBatchFactoryDependencies dependencies) | ||||||||||||||||||||||||||||||||||||||||||||||||||
| private const int DefaultMaxBatchSize = 1000; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| private readonly int _maxBatchSize; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| public ClickHouseModificationCommandBatchFactory( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| ModificationCommandBatchFactoryDependencies dependencies) | ||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| _maxBatchSize = dependencies.CurrentContext.Context.GetService<IDbContextOptions>() | ||||||||||||||||||||||||||||||||||||||||||||||||||
| .Extensions.OfType<ClickHouseOptionsExtension>() | ||||||||||||||||||||||||||||||||||||||||||||||||||
| .FirstOrDefault()?.MaxBatchSize ?? DefaultMaxBatchSize; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+15
to
+17
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| _maxBatchSize = dependencies.CurrentContext.Context.GetService<IDbContextOptions>() | |
| .Extensions.OfType<ClickHouseOptionsExtension>() | |
| .FirstOrDefault()?.MaxBatchSize ?? DefaultMaxBatchSize; | |
| var options = dependencies.CurrentContext.Context.GetService<IDbContextOptions>(); | |
| var extension = options.Extensions.OfType<ClickHouseOptionsExtension>().FirstOrDefault(); | |
| if (extension != null) | |
| { | |
| var maxBatchSizeProperty = extension.GetType().GetProperty("MaxBatchSize"); | |
| var value = maxBatchSizeProperty?.GetValue(extension); | |
| if (value is int configuredMaxBatchSize && configuredMaxBatchSize > 0) | |
| { | |
| _maxBatchSize = configuredMaxBatchSize; | |
| } | |
| else | |
| { | |
| _maxBatchSize = DefaultMaxBatchSize; | |
| } | |
| } | |
| else | |
| { | |
| _maxBatchSize = DefaultMaxBatchSize; | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This documentation shows configuring
MaxBatchSizeviao => o.MaxBatchSize(5000), but there is currently noMaxBatchSizeoption exposed onClickHouseDbContextOptionsBuilder/ClickHouseOptionsExtensionin this PR (andClickHouseModificationCommandBatchFactoryreferences a missingMaxBatchSizemember). Either implement and document the option end-to-end, or remove/adjust this section to match the actual public API.