Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.GetJobConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using System;
using System.Linq;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Functions.Worker;
using SqlBulkSyncFunction.Helpers;
using SqlBulkSyncFunction.Models.Job;

namespace SqlBulkSyncFunction.Functions;

public partial class GetSyncJobConfig
{
[Function(nameof(GetSyncJobConfig) + nameof(GetJobConfig))]
public IActionResult GetJobConfig(
[HttpTrigger(
AuthorizationLevel.Function,
"get",
Route = "config/{area}/{id}"
)] HttpRequest req,
string area,
string id
)
{
ArgumentNullException.ThrowIfNull(req);

if (!string.IsNullOrWhiteSpace(area) &&
!string.IsNullOrWhiteSpace(id) &&
syncJobsConfig?.Value?.Jobs?.TryGetValue(id, out var jobConfig) == true &&
StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig?.Area))
{
var tables = jobConfig.Tables?.ToDictionary(
key => key.Key,
value => new SyncJobConfigTableDto(
Source: value.Value,
Target: jobConfig.TargetTables?.TryGetValue(value.Key, out var target) == true && !string.IsNullOrWhiteSpace(target) ? target : value.Value,
DisableTargetIdentityInsert: jobConfig.DisableTargetIdentityInsertTables.GetValueOrDefault(value.Key),
DisableConstraintCheck: jobConfig.DisableConstraintCheckTables.GetValueOrDefault(value.Key)
)
) ?? [];
return new OkObjectResult(
new SyncJobConfigResponse(
Id: id,
Area: area,
BatchSize: jobConfig.BatchSize,
Manual: jobConfig.Manual,
Schedules: jobConfig.Schedules,
Tables: tables
)
);
}
return new NotFoundResult();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Dapper;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging;
using SqlBulkSyncFunction.Helpers;
using SqlBulkSyncFunction.Models.Schema;

namespace SqlBulkSyncFunction.Functions;

public partial class GetSyncJobConfig
{
[Function(nameof(GetSyncJobConfig) + nameof(GetJobConfigSchema))]
public async Task<IActionResult> GetJobConfigSchema(
[HttpTrigger(
AuthorizationLevel.Function,
"get",
Route = "config/{area}/{id}/schema"
)] HttpRequest req,
string area,
string id
)
{
ArgumentNullException.ThrowIfNull(req);

if (!string.IsNullOrWhiteSpace(area) &&
!string.IsNullOrWhiteSpace(id) &&
syncJobsConfig?.Value?.Jobs?.TryGetValue(id, out var jobConfig) == true &&
StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig?.Area))
{
var utcNow = DateTimeOffset.UtcNow;
var syncJob = jobConfig.ToSyncJob(
null,
tokenCache: await tokenCacheService.GetTokenCache(jobConfig),
timestamp: utcNow,
expires: utcNow.AddMinutes(4),
id: id,
schedule: nameof(jobConfig.Manual),
seed: false
);

await using SqlConnection
sourceConn = new(syncJob.SourceDbConnection) { AccessToken = syncJob.SourceDbAccessToken },
targetConn = new(syncJob.TargetDbConnection) { AccessToken = syncJob.TargetDbAccessToken };

using IDisposable
from = logger.BeginScope("{DataSource}.{Database}", sourceConn.DataSource, sourceConn.Database),
to = logger.BeginScope("{DataSource}.{Database}", targetConn.DataSource, targetConn.Database);

await sourceConn.OpenAsync();
await targetConn.OpenAsync();

DbInfo
sourceDbInfo = await sourceConn.QueryFirstAsync<DbInfo>(SchemaExtensions.DbInfoQuery),
targetDbInfo = await targetConn.QueryFirstAsync<DbInfo>(SchemaExtensions.DbInfoQuery);

SourceTableChangeTrackingInfo[]
sourceTableChangeTrackingInfos = [.. await sourceConn.QueryAsync<SourceTableChangeTrackingInfo>(SchemaExtensions.SourceTableChangeTrackingInfoQuery)];

targetConn.EnsureSyncSchemaAndTableExists($"config/{id}/{area}/schema", logger);

var tableSchemas = (
syncJob.Tables ?? []
)
.Select(
table => TableSchema.LoadSchema(
sourceConn,
targetConn,
table,
syncJob.BatchSize,
globalChangeTracking: true
)
)
.Select(table => new
{
table.SourceTableName,
table.TargetTableName,
table.SourceVersion,
table.TargetVersion,
table.Columns
})
.ToArray();

return new OkObjectResult(
new
{
sourceDbInfo,
targetDbInfo,
tableSchemas,
sourceTableChangeTrackingInfos
}
);
}
return new NotFoundResult();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Dapper;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging;
using SqlBulkSyncFunction.Helpers;
using SqlBulkSyncFunction.Models.Schema;

namespace SqlBulkSyncFunction.Functions;

public partial class GetSyncJobConfig
{
[Function(nameof(GetSyncJobConfig) + nameof(GetJobConfigSchemaTracking))]
public async Task<IActionResult> GetJobConfigSchemaTracking(
[HttpTrigger(
AuthorizationLevel.Function,
"get",
Route = "config/{area}/{id}/schema/tracking"
)] HttpRequest req,
string area,
string id
)
{
ArgumentNullException.ThrowIfNull(req);

if (!string.IsNullOrWhiteSpace(area) &&
!string.IsNullOrWhiteSpace(id) &&
syncJobsConfig?.Value?.Jobs?.TryGetValue(id, out var jobConfig) == true &&
StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig?.Area))
{
var utcNow = DateTimeOffset.UtcNow;
var syncJob = jobConfig.ToSyncJob(
null,
tokenCache: await tokenCacheService.GetTokenCache(jobConfig),
timestamp: utcNow,
expires: utcNow.AddMinutes(4),
id: id,
schedule: nameof(jobConfig.Manual),
seed: false
);

await using SqlConnection
sourceConn = new(syncJob.SourceDbConnection) { AccessToken = syncJob.SourceDbAccessToken },
targetConn = new(syncJob.TargetDbConnection) { AccessToken = syncJob.TargetDbAccessToken };

using IDisposable
from = logger.BeginScope("{DataSource}.{Database}", sourceConn.DataSource, sourceConn.Database),
to = logger.BeginScope("{DataSource}.{Database}", targetConn.DataSource, targetConn.Database);

await sourceConn.OpenAsync();
await targetConn.OpenAsync();

targetConn.EnsureSyncSchemaAndTableExists($"config/{id}/{area}/schema/tracking", logger);

var rows = new List<SchemaTrackingTableRow>();
foreach (var table in syncJob.Tables ?? [])
{
var columns = sourceConn.GetColumns(table.Source);
var sourceVersion = sourceConn.GetSourceVersion(table.Source, globalChangeTracking: true, columns);
var targetVersion = targetConn.GetTargetVersion(table.Target);
var fromVersion = targetVersion.CurrentVersion < 0 ? 0L : targetVersion.CurrentVersion;

if (sourceVersion == null)
{
rows.Add(
new SchemaTrackingTableRow(
table.Id,
table.Source,
0,
0,
0,
-1,
targetVersion.CurrentVersion,
table.Target
)
);
continue;
}

var counts = await sourceConn.QueryFirstAsync<ChangeOperationCounts>(
SqlStatementExtensions.GetChangeTrackingOperationCountsSelectStatement(table.Source),
new { FromVersion = fromVersion }
);

rows.Add(
new SchemaTrackingTableRow(
table.Id,
table.Source,
counts.Updated,
counts.Inserted,
counts.Deleted,
sourceVersion.CurrentVersion,
targetVersion.CurrentVersion,
table.Target
)
);
}

return new OkObjectResult(rows);
}
return new NotFoundResult();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Dapper;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging;
using SqlBulkSyncFunction.Helpers;
using SqlBulkSyncFunction.Models.Schema;

namespace SqlBulkSyncFunction.Functions;

public partial class GetSyncJobConfig
{
/// <summary>
/// Returns change tracking primary key details for a specific configured table.
/// </summary>
[Function(nameof(GetSyncJobConfig) + nameof(GetJobConfigSchemaTrackingTable))]
public async Task<IActionResult> GetJobConfigSchemaTrackingTable(
[HttpTrigger(
AuthorizationLevel.Function,
"get",
Route = "config/{area}/{id}/schema/tracking/{tableId}"
)] HttpRequest req,
string area,
string id,
string tableId
)
{
ArgumentNullException.ThrowIfNull(req);

if (!string.IsNullOrWhiteSpace(area) &&
!string.IsNullOrWhiteSpace(id) &&
!string.IsNullOrWhiteSpace(tableId) &&
syncJobsConfig?.Value?.Jobs?.TryGetValue(id, out var jobConfig) == true &&
StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig?.Area))
{
var utcNow = DateTimeOffset.UtcNow;
var syncJob = jobConfig.ToSyncJob(
null,
tokenCache: await tokenCacheService.GetTokenCache(jobConfig),
timestamp: utcNow,
expires: utcNow.AddMinutes(4),
id: id,
schedule: nameof(jobConfig.Manual),
seed: false
);

var table = (syncJob.Tables ?? [])
.FirstOrDefault(configuredTable => StringComparer.OrdinalIgnoreCase.Equals(configuredTable.Id, tableId));

if (table == null)
{
return new NotFoundResult();
}

await using SqlConnection
sourceConn = new(syncJob.SourceDbConnection) { AccessToken = syncJob.SourceDbAccessToken },
targetConn = new(syncJob.TargetDbConnection) { AccessToken = syncJob.TargetDbAccessToken };

using IDisposable
from = logger.BeginScope("{DataSource}.{Database}", sourceConn.DataSource, sourceConn.Database),
to = logger.BeginScope("{DataSource}.{Database}", targetConn.DataSource, targetConn.Database);

await sourceConn.OpenAsync();
await targetConn.OpenAsync();

targetConn.EnsureSyncSchemaAndTableExists($"config/{id}/{area}/schema/tracking/{tableId}", logger);

var columns = sourceConn.GetColumns(table.Source);
var sourceVersion = sourceConn.GetSourceVersion(table.Source, globalChangeTracking: true, columns);
var targetVersion = targetConn.GetTargetVersion(table.Target);
var fromVersion = targetVersion.CurrentVersion < 0 ? 0L : targetVersion.CurrentVersion;

if (sourceVersion == null)
{
return new OkObjectResult(
new SchemaTrackingPrimaryKeyDetailsResponse([], [], [])
);
}

var changedRows = await sourceConn.QueryAsync(
SqlStatementExtensions.GetChangeTrackingPrimaryKeyDetailsSelectStatement(table.Source, columns),
new { FromVersion = fromVersion }
);

var updated = new List<IReadOnlyDictionary<string, object>>();
var inserted = new List<IReadOnlyDictionary<string, object>>();
var deleted = new List<IReadOnlyDictionary<string, object>>();

foreach (var row in changedRows)
{
if (row is not IDictionary<string, object> values ||
!values.TryGetValue("Operation", out var operationValue) ||
operationValue is not string operation ||
string.IsNullOrWhiteSpace(operation))
{
continue;
}

var primaryKeyValues = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase);
foreach (var value in values)
{
if (!string.Equals(value.Key, "Operation", StringComparison.OrdinalIgnoreCase))
{
primaryKeyValues[value.Key] = value.Value;
}
}

switch (operation)
{
case "U":
updated.Add(primaryKeyValues);
break;
case "I":
inserted.Add(primaryKeyValues);
break;
case "D":
deleted.Add(primaryKeyValues);
break;
}
}

return new OkObjectResult(
new SchemaTrackingPrimaryKeyDetailsResponse(updated, inserted, deleted)
);
}

return new NotFoundResult();
}
}
Loading
Loading