Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1902a89
remove obsolete README.md for Input-Process-Output (ipo)
flarco Jan 6, 2026
8aa4d5d
add mutex protection for schemata merging in GetSchemataAll
flarco Jan 6, 2026
6ca9ed7
update GetSchemataAll to use SchemataLevelColumn and add correspondin…
flarco Jan 6, 2026
eafebc6
update Go version to 1.25 in build and test workflows
flarco Jan 6, 2026
4dde4a0
add support for SLING_SYNCED_AT_COLUMN and implement related tests
flarco Jan 7, 2026
6623fa4
add support for _sling_synced_op column and implement related checks …
flarco Jan 7, 2026
7a944fc
refactor: replace hardcoded column names with reserved fields
flarco Jan 7, 2026
d8e3bf4
refactor: reorder ReservedFields struct for consistency
flarco Jan 7, 2026
5a8b425
fix: enhance CastToStringE to handle time.Time and *time.Time types
flarco Jan 8, 2026
f13188a
fix(ddl): correctly add primary key with table options
flarco Jan 8, 2026
9147579
refactor: clean up commented-out transform logic in replication test
flarco Jan 8, 2026
ac4db99
feat(replication): introduce definition-only mode
flarco Jan 8, 2026
6d0cdd8
fix(connection): update Hash method to exclude specified keys and adj…
flarco Jan 9, 2026
64363da
fix(tests): add output directory definition for parquet file generation
flarco Jan 9, 2026
35ec9bb
feat(db2): add initial support for DB2 database
flarco Jan 9, 2026
6ca9196
fix(yaml): correct merge_update_insert placeholder and update concat …
flarco Jan 9, 2026
d1ee4db
fix(yaml): restore merge_update_insert placeholder in base.yaml
flarco Jan 9, 2026
7bc65e2
feat(oracle): add support for XMLTYPE column transfer to BigQuery and…
flarco Jan 13, 2026
f32e80a
fix(connection): update cache usage in database connection initializa…
flarco Jan 13, 2026
f9b5cc6
feat(mysql): add tests for LOAD DATA LOCAL INFILE and NULL handling; …
flarco Jan 13, 2026
8e5dce5
feat(stream): add description field to StreamState and update state s…
flarco Jan 14, 2026
862d4e4
move Marker variable for consistent CLI branding
flarco Jan 14, 2026
1386f22
fix(version): allow SLING_AGENT_ID to bypass dev build expiration check
flarco Jan 16, 2026
d838aa9
fix(deps): update godbc dependency to v0.0.4 and remove v0.0.3
flarco Jan 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
- name: Set up GoLang
uses: actions/setup-go@v3
with:
go-version: "1.24"
go-version: "1.25"
cache: false

- name: Load Secrets
Expand Down Expand Up @@ -88,7 +88,7 @@ jobs:
- name: Set up GoLang
uses: actions/setup-go@v3
with:
go-version: "1.24"
go-version: "1.25"
cache: false

- name: Load Secrets
Expand Down Expand Up @@ -137,7 +137,7 @@ jobs:
- name: Set up GoLang
uses: actions/setup-go@v3
with:
go-version: "1.24"
go-version: "1.25"
cache: false

- name: Load Secrets
Expand Down Expand Up @@ -199,7 +199,7 @@ jobs:
- name: Set up GoLang
uses: actions/setup-go@v3
with:
go-version: "1.24"
go-version: "1.25"
cache: false

- name: Load Secrets
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/test-docker-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: "1.24"
go-version: "1.25"
cache: false

- name: Load Secrets
Expand Down Expand Up @@ -65,7 +65,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: "1.24"
go-version: "1.25"
cache: true

- name: Load Secrets
Expand Down Expand Up @@ -115,7 +115,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: "1.24"
go-version: "1.25"
cache: false

- name: Load Secrets
Expand Down Expand Up @@ -158,7 +158,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: "1.24"
go-version: "1.25"
cache: false

- name: Load Secrets
Expand Down
5 changes: 2 additions & 3 deletions cmd/sling/sling_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,10 @@ runReplication:
defer connection.CloseAll()

if !env.IsThreadChild {
text := "Sling CLI | https://slingdata.io"
if env.NoColor {
g.Info(text)
g.Info(env.Marker)
} else {
g.Info(env.CyanString(text))
g.Info(env.CyanString(env.Marker))
}

// check for update, and print note
Expand Down
9 changes: 8 additions & 1 deletion cmd/sling/sling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ var connMap = map[dbio.Type]connTest{
dbio.TypeFileGoogleDrive: {name: "google_drive"},
dbio.TypeFileFtp: {name: "ftp_test_url"},
dbio.TypeFileAzureABFS: {name: "fabric_lake"},
dbio.Type("db2"): {name: "db2", adjustCol: g.Bool(false)},
}

func init() {
Expand Down Expand Up @@ -782,7 +783,9 @@ func runOneTask(t *testing.T, file g.FileItem, connType dbio.Type) {
// skip those
if g.In(srcType, dbio.TypeDbMongoDB, dbio.TypeDbAzureTable) ||
g.In(tgtType, dbio.TypeDbMongoDB, dbio.TypeDbAzureTable) ||
taskCfg.TgtConn.IsADBC() || taskCfg.SrcConn.IsADBC() {
taskCfg.TgtConn.IsADBC() || taskCfg.SrcConn.IsADBC() ||
taskCfg.TgtConn.Type == dbio.TypeDbODBC ||
taskCfg.SrcConn.Type == dbio.TypeDbODBC {
continue
}

Expand Down Expand Up @@ -1096,6 +1099,10 @@ func TestSuiteDatabaseIceberg(t *testing.T) {
// testSuite(t, dbio.TypeDbIceberg, "1-4,6-12")
}

func TestSuiteDatabaseDB2(t *testing.T) {
testSuite(t, dbio.Type("db2"), "1-5,7+")
}

func TestSuiteDatabaseSQLServer(t *testing.T) {
t.Parallel()
testSuite(t, dbio.TypeDbSQLServer)
Expand Down
222 changes: 222 additions & 0 deletions cmd/sling/tests/replications/r.85.mssql_postgres_synced_at.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
source: MSSQL
target: POSTGRES

env:
SLING_SYNCED_AT_COLUMN: true

hooks:
start:
# Create table 1 with 10 rows
- type: query
connection: '{source.name}'
query: |
IF OBJECT_ID('dbo.synced_at_test1', 'U') IS NOT NULL DROP TABLE dbo.synced_at_test1;
CREATE TABLE dbo.synced_at_test1 (
id INT PRIMARY KEY,
name NVARCHAR(100),
value DECIMAL(10,2),
modified_at DATETIME DEFAULT GETDATE()
);
INSERT INTO dbo.synced_at_test1 (id, name, value) VALUES
(1, 'Row 1', 100.00), (2, 'Row 2', 200.00), (3, 'Row 3', 300.00),
(4, 'Row 4', 400.00), (5, 'Row 5', 500.00), (6, 'Row 6', 600.00),
(7, 'Row 7', 700.00), (8, 'Row 8', 800.00), (9, 'Row 9', 90.00),
(10, 'Row 10', 1000.00);

# Create table 2 with 8 rows (IDs 1-8), IDs 9 and 10 will be missing for soft delete test
- type: query
connection: '{source.name}'
query: |
IF OBJECT_ID('dbo.synced_at_test2', 'U') IS NOT NULL DROP TABLE dbo.synced_at_test2;
CREATE TABLE dbo.synced_at_test2 (
id INT PRIMARY KEY,
name NVARCHAR(100),
value DECIMAL(10,2),
modified_at DATETIME DEFAULT GETDATE()
);
INSERT INTO dbo.synced_at_test2 (id, name, value) VALUES
(1, 'Item 1', 10.00), (2, 'Item 2', 20.00), (3, 'Item 3', 30.00),
(4, 'Item 4', 40.00), (5, 'Item 5', 50.00), (6, 'Item 6', 60.00),
(7, 'Item 7', 70.00), (8, 'Item 8', 80.00);

end:
- type: check
check: execution.status.error == 0
on_failure: break

# Verify _sling_synced_at EXISTS for table1
- type: query
connection: '{target.name}'
query: |
SELECT COUNT(*) as col_exists
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'synced_at_test1'
AND column_name = '_sling_synced_at'
into: synced_at_exists_t1

- type: check
check: int_parse(store.synced_at_exists_t1[0].col_exists) == 1
failure_message: "_sling_synced_at column missing in synced_at_test1"

- type: log
message: "SUCCESS: _sling_synced_at column exists in synced_at_test1"

# Verify _sling_deleted_at does NOT EXIST for table1
- type: query
connection: '{target.name}'
query: |
SELECT COUNT(*) as col_exists
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'synced_at_test1'
AND column_name = '_sling_deleted_at'
into: deleted_at_exists_t1

- type: check
check: int_parse(store.deleted_at_exists_t1[0].col_exists) == 0
failure_message: "_sling_deleted_at column should NOT exist in synced_at_test1"

- type: log
message: "SUCCESS: _sling_deleted_at column does NOT exist in synced_at_test1"

# Verify data type is timestamp
- type: query
connection: '{target.name}'
query: |
SELECT data_type
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'synced_at_test1'
AND column_name = '_sling_synced_at'
into: synced_at_type

- type: check
check: contains(store.synced_at_type[0].data_type, "timestamp")
failure_message: "_sling_synced_at should be timestamp type, got {store.synced_at_type[0].data_type}"

# Verify row counts for table1
- type: query
connection: '{target.name}'
query: SELECT COUNT(*) as count FROM public.synced_at_test1
into: t1_count

- type: check
check: int_parse(store.t1_count[0].count) == 10
failure_message: "Expected 10 rows in synced_at_test1, got {store.t1_count[0].count}"

# Verify synced_at_test1 has different _sling_synced_at values (rows 9,10 should have later timestamp)
- type: query
connection: '{target.name}'
query: |
SELECT COUNT(DISTINCT _sling_synced_at) as distinct_synced_at
FROM public.synced_at_test1
into: distinct_synced_at

- type: check
check: int_parse(store.distinct_synced_at[0].distinct_synced_at) == 2
failure_message: "Expected 2 distinct _sling_synced_at values (IDs 1-8 vs 9-10), got {store.distinct_synced_at[0].distinct_synced_at}"

- type: log
message: "SUCCESS: synced_at_test1 has 2 distinct _sling_synced_at values"

# Verify _sling_synced_op column EXISTS
- type: query
connection: '{target.name}'
query: |
SELECT COUNT(*) as col_exists
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'synced_at_test1'
AND column_name = '_sling_synced_op'
into: synced_op_exists

- type: check
check: int_parse(store.synced_op_exists[0].col_exists) == 1
failure_message: "_sling_synced_op column missing"

- type: log
message: "SUCCESS: _sling_synced_op column exists"

# Verify soft-deleted rows (IDs 9-10) have _sling_synced_op = 'D'
- type: query
connection: '{target.name}'
query: |
SELECT COUNT(*) as deleted_count
FROM public.synced_at_test1
WHERE id IN (9, 10) AND _sling_synced_op = 'D'
into: deleted_op_count

- type: check
check: int_parse(store.deleted_op_count[0].deleted_count) == 2
failure_message: "Expected 2 rows with _sling_synced_op='D' for IDs 9,10, got {store.deleted_op_count[0].deleted_count}"

- type: log
message: "SUCCESS: IDs 9,10 have _sling_synced_op='D' (soft deleted)"

# Verify updated rows (IDs 1-8) have _sling_synced_op = 'U'
- type: query
connection: '{target.name}'
query: |
SELECT COUNT(*) as updated_count
FROM public.synced_at_test1
WHERE id BETWEEN 1 AND 8 AND _sling_synced_op = 'U'
into: updated_op_count

- type: check
check: int_parse(store.updated_op_count[0].updated_count) == 8
failure_message: "Expected 8 rows with _sling_synced_op='U' for IDs 1-8, got {store.updated_op_count[0].updated_count}"

- type: log
message: "SUCCESS: IDs 1-8 have _sling_synced_op='U' (updated)"

# Cleanup
- type: query
connection: '{source.name}'
query: |
IF OBJECT_ID('dbo.synced_at_test1', 'U') IS NOT NULL DROP TABLE dbo.synced_at_test1;
IF OBJECT_ID('dbo.synced_at_test2', 'U') IS NOT NULL DROP TABLE dbo.synced_at_test2;

- type: query
connection: '{target.name}'
query: |
DROP TABLE IF EXISTS public.synced_at_test1 CASCADE;

streams:
dbo.synced_at_test1:
object: public.synced_at_test1
mode: full-refresh
primary_key: [id]
target_options:
column_casing: lower

hooks:
post:
# Verify all rows have _sling_synced_op = 'I' after full-refresh insert
- type: query
connection: '{target.name}'
query: |
SELECT COUNT(*) as insert_count
FROM public.synced_at_test1
WHERE _sling_synced_op = 'I'
into: insert_op_count

- type: check
check: int_parse(store.insert_op_count[0].insert_count) == 10
failure_message: "Expected 10 rows with _sling_synced_op='I' after full-refresh, got {store.insert_op_count[0].insert_count}"

- type: log
message: "SUCCESS: All 10 rows have _sling_synced_op='I' after full-refresh"

# Sleep 2 seconds to ensure different _sling_synced_at timestamps
- type: query
connection: '{target.name}'
query: SELECT pg_sleep(2)

dbo.synced_at_test2:
object: public.synced_at_test1
mode: incremental
primary_key: [id]
target_options:
delete_missing: soft
column_casing: lower
Loading