Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Dapper;
Expand Down Expand Up @@ -188,4 +189,93 @@ string id
}
return new NotFoundResult();
}

[Function(nameof(GetSyncJobConfig) + nameof(GetJobConfigSchemaTracking))]
public async Task<IActionResult> GetJobConfigSchemaTracking(
[HttpTrigger(
AuthorizationLevel.Function,
"get",
Route ="config/{area}/{id}/schema/tracking"
)] HttpRequest req,
string area,
string id
)
{
ArgumentNullException.ThrowIfNull(req);

if (!string.IsNullOrWhiteSpace(area) &&
!string.IsNullOrWhiteSpace(id) &&
syncJobsConfig?.Value?.Jobs?.TryGetValue(id, out var jobConfig) == true &&
StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig?.Area))
{
var utcNow = DateTimeOffset.UtcNow;
var syncJob = jobConfig.ToSyncJob(
null,
tokenCache: await tokenCacheService.GetTokenCache(jobConfig),
timestamp: utcNow,
expires: utcNow.AddMinutes(4),
id: id,
schedule: nameof(jobConfig.Manual),
seed: false
);

await using SqlConnection
sourceConn = new(syncJob.SourceDbConnection) { AccessToken = syncJob.SourceDbAccessToken },
targetConn = new(syncJob.TargetDbConnection) { AccessToken = syncJob.TargetDbAccessToken };

using IDisposable
from = logger.BeginScope("{DataSource}.{Database}", sourceConn.DataSource, sourceConn.Database),
to = logger.BeginScope("{DataSource}.{Database}", targetConn.DataSource, targetConn.Database);

await sourceConn.OpenAsync();
await targetConn.OpenAsync();

targetConn.EnsureSyncSchemaAndTableExists($"config/{id}/{area}/schema/tracking", logger);

var rows = new List<SchemaTrackingTableRow>();
foreach (var table in syncJob.Tables ?? [])
{
var columns = sourceConn.GetColumns(table.Source);
var sourceVersion = sourceConn.GetSourceVersion(table.Source, globalChangeTracking: true, columns);
var targetVersion = targetConn.GetTargetVersion(table.Target);
long? fromVersion = targetVersion.CurrentVersion < 0 ? null : targetVersion.CurrentVersion;

if (sourceVersion == null)
{
rows.Add(
new SchemaTrackingTableRow(
table.Source,
0,
0,
0,
-1,
targetVersion.CurrentVersion,
table.Target
)
);
continue;
}

var counts = await sourceConn.QueryFirstAsync<ChangeOperationCounts>(
SqlStatementExtensions.GetChangeTrackingOperationCountsSelectStatement(table.Source),
new { FromVersion = fromVersion }
);

rows.Add(
new SchemaTrackingTableRow(
table.Source,
counts.Updated,
counts.Inserted,
counts.Deleted,
sourceVersion.CurrentVersion,
targetVersion.CurrentVersion,
table.Target
)
);
}

return new OkObjectResult(rows);
}
return new NotFoundResult();
}
}
14 changes: 14 additions & 0 deletions src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,20 @@ FROM CHANGETABLE(VERSION {0}, ({1}), ({1})) as t
);
}

/// <summary>
/// Builds a query that aggregates <c>CHANGETABLE</c> rows by <c>SYS_CHANGE_OPERATION</c>,
/// producing <c>Updated</c>, <c>Inserted</c>, and <c>Deleted</c> counts (one row per table).
/// Pass <c>@FromVersion</c> as <c>NULL</c> when no row exists in <c>sync.TableVersion</c> (never synced).
/// </summary>
/// <param name="sourceTableName">Fully qualified source table name (e.g. <c>[dbo].[MyTable]</c>).</param>
public static string GetChangeTrackingOperationCountsSelectStatement(string sourceTableName)
=> $"""
SELECT ISNULL(SUM(CASE WHEN ct.SYS_CHANGE_OPERATION = N'U' THEN 1 ELSE 0 END), 0) AS Updated,
ISNULL(SUM(CASE WHEN ct.SYS_CHANGE_OPERATION = N'I' THEN 1 ELSE 0 END), 0) AS Inserted,
ISNULL(SUM(CASE WHEN ct.SYS_CHANGE_OPERATION = N'D' THEN 1 ELSE 0 END), 0) AS Deleted
FROM CHANGETABLE(CHANGES {sourceTableName}, @FromVersion) AS ct
""";

public static string GetDropStatement(this string tableName)
=> $"""
IF OBJECT_ID('{tableName}') IS NOT NULL
Expand Down
16 changes: 16 additions & 0 deletions src/SqlBulkSyncFunction/Models/Schema/ChangeOperationCounts.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace SqlBulkSyncFunction.Models.Schema;

/// <summary>
/// Maps a single row returned by <c>SqlStatementExtensions.GetChangeTrackingOperationCountsSelectStatement</c> for Dapper materialization.
/// </summary>
public sealed class ChangeOperationCounts
{
/// <summary>Count of <c>SYS_CHANGE_OPERATION = N'U'</c> rows.</summary>
public long Updated { get; set; }

/// <summary>Count of <c>SYS_CHANGE_OPERATION = N'I'</c> rows.</summary>
public long Inserted { get; set; }

/// <summary>Count of <c>SYS_CHANGE_OPERATION = N'D'</c> rows.</summary>
public long Deleted { get; set; }
}
22 changes: 22 additions & 0 deletions src/SqlBulkSyncFunction/Models/Schema/SchemaTrackingTableRow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
namespace SqlBulkSyncFunction.Models.Schema;

/// <summary>
/// Per-table change tracking summary: counts of rows per <c>SYS_CHANGE_OPERATION</c> from
/// <c>CHANGETABLE(CHANGES …, last_sync_version)</c>, plus current source version and last synced target version.
/// </summary>
/// <param name="SourceTableName">Qualified source table name.</param>
/// <param name="Updated">Number of changes with <c>SYS_CHANGE_OPERATION = N'U'</c>.</param>
/// <param name="Inserted">Number of changes with <c>SYS_CHANGE_OPERATION = N'I'</c>.</param>
/// <param name="Deleted">Number of changes with <c>SYS_CHANGE_OPERATION = N'D'</c>.</param>
/// <param name="SourceVersion">Current change tracking version on the source (<c>CHANGE_TRACKING_CURRENT_VERSION()</c> for database-wide tracking).</param>
/// <param name="TargetVersion">Last version stored in <c>sync.TableVersion</c> for the target table (or <c>-1</c> if never synced).</param>
/// <param name="TargetTableName">Qualified target table name.</param>
public record SchemaTrackingTableRow(
string SourceTableName,
long Updated,
long Inserted,
long Deleted,
long SourceVersion,
long TargetVersion,
string TargetTableName
);
Loading