Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 31 additions & 20 deletions SYNC.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Overview

The sync module continuously synchronizes geospatial layer data from a third-party GraphQL API into a remote PostgreSQL database. It fetches data in pages, inserts new objects, updates deprecated ones, and tracks progress per layer using offset-based pagination.
The sync module continuously synchronizes geospatial layer data from a third-party GraphQL API into a remote PostgreSQL database. It fetches data in pages, inserts new objects, deletes deprecated ones, and tracks progress per layer using an opaque `sequence` cursor returned by the third-party API.

## Architecture

Expand All @@ -22,7 +22,7 @@ The sync module continuously synchronizes geospatial layer data from a third-par
┌──────┐ ┌───────────────────┐ ┌───────────────────┐
│Third │ │ SyncState │ │ LayerData │
│Party │ │ Repository │ │ Repository │
│Client│ │ (offset, status) │ │ (insert/update) │
│Client│ │(lastSequence,stat)│ │ (insert/delete) │
└──────┘ └───────────────────┘ └───────────────────┘
```

Expand All @@ -37,11 +37,11 @@ src/
├── externalClients/
│ ├── layersClient/
│ │ └── layersClient.ts # GraphQL client for the third-party API
│ └── layersClientModel.ts # GetLayerPage query string
│ └── layersClientModel.ts # buildLayerQuery(layerName) helper
├── dal/
│ └── repositories/
│ ├── syncStateRepository.ts # Tracks sync offset and status per layer
│ └── layerDataRepository.ts # Inserts new objects / updates deprecated ones
│ ├── syncStateRepository.ts # Tracks last sync sequence and status per layer
│ └── layerDataRepository.ts # Inserts new objects / deletes deprecated ones
├── common/
│ ├── syncConfig.ts # Static config (layers, intervals, page size, URL)
│ └── ... # Shared infra (config, constants, DI, tracing)
Expand All @@ -50,63 +50,74 @@ src/
├── syncConfig.ts # SyncConfig interface
├── syncState.ts # SyncStatus enum + SyncStateEntry interface
├── scheduler.ts # ScheduleEntry interface
└── thirdParty.ts # LayerObject, DeprecatedObject, ThirdPartyResponse
└── thirdParty.ts # LayerObject, ThirdPartyResponse
```

## How It Works

### Sync Lifecycle

1. **Startup** - `SyncManager.start()` reads the configured layers, initializes sync state for each (status: `SYNCING`, offset: `0`), and pushes them into a min-heap scheduler.
1. **Startup** - `SyncManager.start()` reads the configured layers, initializes sync state for each (status: `SYNCING`, lastSequence: `"0"`), and pushes them into a min-heap scheduler.

2. **Scheduler Loop** - The loop pops the next due layer, sleeps until its scheduled time, then calls `fetchAndSyncLayerPage()`.

3. **Page Fetch** - `layerClient.fetchPage()` sends a GraphQL query to the third-party API requesting up to `pageSize` objects starting from the current offset.
3. **Page Fetch** - `layerClient.fetchPage()` POSTs a GraphQL query (`query { <layerName> { ... } }`) to the third-party API. All pagination and identification inputs are passed via HTTP headers: `reality-id`, `requesting-sys`, `requesting-sys-name`, `sequence`, `page-size`, `Authorization`, `use-Delete-Entities`.

4. **Data Processing** - The handler orchestrates the response and delegates to `layerDataRepository`:
4. **Data Processing** - The handler reads the response (`data.<layerName>` + `extensions`) and delegates to `layerDataRepository`:
- `insertObjects()` - Batch upserts new/updated geospatial objects into the remote DB layer table.
- `deleteDeprecatedObjects()` - Batch deletes deprecated objects from the remote DB by id.
- `deleteDeprecatedObjects()` - Batch deletes deprecated objects by id (from `extensions.deletedEntitiesIds`).

5. **State Update** - `syncStateRepository` advances the offset to `nextRecord`.
5. **State Update** - `syncStateRepository` advances the stored `lastSequence` to `extensions.sequence`.

6. **Status Transition** - When a page returns 0 objects during `SYNCING`, the layer transitions to `READY` (initial sync complete).
6. **Status Transition** - When a page returns `fetchedEntitiesCount === 0` during `SYNCING`, the layer transitions to `READY` (initial sync complete).

7. **Re-schedule** - The layer is pushed back into the heap with:
- `syncIntervalMs` (500ms) while `SYNCING` (fast initial catch-up)
- `pollIntervalMs` (10 min) once `READY` (periodic polling for changes)

### Configuration

| Property | Default | Description |
|---------------------|----------------------------------|--------------------------------------------------|
| `layers` | `['obstacles']` | Layer names to sync |
| `syncIntervalMs` | `500` | Delay between pages during initial sync |
| `pollIntervalMs` | `600000` (10 min) | Delay between polls after initial sync completes |
| `pageSize` | `500` | Max records requested per page |
| `thirdPartyBaseUrl` | `http://mock-third-party/graphql`| GraphQL endpoint URL |
| Property | Default | Description |
| ---------------------- | --------------------------------- | ------------------------------------------------------ |
| `layers` | `['obstacles']` | Layer names to sync |
| `syncIntervalMs` | `500` | Delay between pages during initial sync |
| `pollIntervalMs` | `600000` (10 min) | Delay between polls after initial sync completes |
| `pageSize` | `1000` | Max records requested per page (sent as `page-size`) |
| `thirdPartyBaseUrl` | `http://mock-third-party/graphql` | GraphQL endpoint URL |
| `realityId` | `0` | Sent as the `reality-id` header (e.g. `0` = prod) |
| `requestingSystem` | `sync-layer-server_prod` | Sent as the `requesting-sys` header |
| `requestingSystemName` | `sync-layer-server` | Sent as the `requesting-sys-name` header |
| `useDeleteEntities` | `true` | Sent as `use-Delete-Entities` (include deprecated ids) |
| `auth.token` | `""` | Sent as the `Authorization` header |

## What Still Needs to Be Done

### Remote Database Integration
- [ ] **syncStateRepository** - Persist sync state (offset, status) to a `sync_state` table in the remote PostgreSQL database instead of the in-memory `Map`. Currently resets on restart.

- [ ] **syncStateRepository** - Persist sync state (lastSequence, status) to a `sync_state` table in the remote PostgreSQL database instead of the in-memory `Map`. Currently resets on restart.
- [ ] **layerDataRepository** - Implement actual SQL queries for `INSERT ... ON CONFLICT` (upsert) and `UPDATE` with JSONB merge against the remote DB. Tables should match layer names.
- [ ] **DB connection** - Set up a connection pool (e.g., `pg` / `knex` / `typeorm`) to the remote PostgreSQL instance with connection string from config/environment.

### Configuration

- [ ] **syncConfig** - Load config from the application config provider (e.g., `node-config` / environment variables) instead of hardcoded values.
- [ ] **thirdPartyBaseUrl** - Set the real third-party GraphQL endpoint URL.

### GraphQL

- [ ] **queries.ts** - Verify and adjust the GraphQL query schema to match the actual third-party API contract.

### Error Handling & Resilience

- [ ] Handle partial page failures (some objects succeed, some fail).

### Observability

- [ ] Add metrics (pages fetched, objects inserted, errors) via `prom-client`.
- [ ] Add OpenTelemetry spans for tracing sync operations.

### Testing

- [ ] Unit tests for `layerSyncHandler` (mock the repositories and client).
- [ ] Unit tests for `syncManager` scheduling logic.
- [ ] Integration tests with a real database.
41 changes: 21 additions & 20 deletions TYPEORM_AND_DB_CONNECTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ The layer data schema uses **native PostgreSQL LIST partitioning**: there is a s

### Database schema

- `**migrations/001_create_tables.sql`** - bootstrap SQL creating:
- `sync_state` - shared table tracking per-layer sync status (`layer_name`, `status`, `last_offset`, `updated_at`).
- `layer_objects` - LIST-partitioned **parent** table keyed by `layer_name`, with composite PK `(layer_name, id)` and columns `footprint` (PostGIS `geometry(Polygon, 4326) NOT NULL`) and `properties` (JSONB). `footprint` has a GiST spatial index and `CHECK` constraints enforcing validity (`ST_IsValid`) and world-extent (`Box2D ... @ Box2D(...)`). Requires the `postgis` extension. The parent stores no rows - each layer's data lives in its own partition.
- `**migrations/001_create_tables.sql`\*\* - bootstrap SQL creating:
- `sync_state` - shared table tracking per-layer sync status (`layer_name`, `status`, `last_sequence`, `updated_at`).
- `layer_objects` - LIST-partitioned **parent** table keyed by `layer_name`, with composite PK `(layer_name, id)` and columns `geom` (PostGIS `geometry(Polygon, 4326) NOT NULL`) and `properties` (JSONB). `geom` has a GiST spatial index and `CHECK` constraints enforcing validity (`ST_IsValid`) and world-extent (`Box2D ... @ Box2D(...)`). Requires the `postgis` extension. The parent stores no rows - each layer's data lives in its own partition.
- Per-layer partitions (`layer_<name>`) are **not** in the migration; they are created at runtime by `ensureLayerPartitions()` based on `sync.layers`.

### Config

- `**config/default.json`** - new `db` section (host, port, database, username, password, ssl).
- `**config/default.json`\*\* - new `db` section (host, port, database, username, password, ssl).
- `**src/types/dbConfig.ts**` - `DbConfig` interface.
- `**src/common/dbConfig.ts**` - `getDbConfig()` helper, mirrors the existing `getSyncConfig()` pattern on top of `@map-colonies/config`.

Expand All @@ -32,24 +32,24 @@ The layer data schema uses **native PostgreSQL LIST partitioning**: there is a s

### Entities

- `**src/dal/entities/syncState.ts**` - `SyncStateEntry` `@Entity('sync_state')` class with `layerName`, `status`, `lastOffset`, `updatedAt` columns (plus the existing `SyncStatus` enum).
- `**src/dal/entities/syncState.ts**` - `SyncStateEntry` `@Entity('sync_state')` class with `layerName`, `status`, `lastSequence`, `updatedAt` columns (plus the existing `SyncStatus` enum).
- `**src/dal/entities/layerObject.ts**` - a **single** `LayerObjectEntity` mapped to the partitioned parent `layer_objects`:
- Composite primary key `(layer_name, id)` (required because `layer_name` is the partition key).
- Columns: `footprint geometry(Polygon, 4326) NOT NULL` (PostGIS) with a GiST spatial index and validity / world-extent `CHECK` constraints, `properties JSONB NOT NULL DEFAULT '{}'`, `created_at TIMESTAMPTZ NOT NULL DEFAULT now()`.
- Columns: `geom geometry(Polygon, 4326) NOT NULL` (PostGIS) with a GiST spatial index and validity / world-extent `CHECK` constraints, `properties JSONB NOT NULL DEFAULT '{}'`, `created_at TIMESTAMPTZ NOT NULL DEFAULT now()`.
- `getLayerPartitionName(layerName)` helper returns the child-partition name (`layer_<layerName>`), used by `ensureLayerPartitions()`.
- `LayerObject` / `DeprecatedObject` domain types for the external API.
- `**src/dal/entities/index.ts`** - re-exports the entity class, the partition-name helper, and the types.
- `LayerObject` domain type for the external API.
- `**src/dal/entities/index.ts`\*\* - re-exports the entity class, the partition-name helper, and the types.

### Repositories (DB-backed, async)

- `**src/dal/repositories/syncStateRepository.ts**` - unchanged shape, backed by the shared `sync_state` table.
- `**src/dal/repositories/layerDataRepository.ts**` - layer-aware via the `layer_name` column, **not** via dynamic entities or table names:
- `insertObjects(layerName, objects)` - bulk insert into `layer_objects` with `layer_name` stamped on every row; uses `orIgnore()` (`ON CONFLICT DO NOTHING`) so sync retries/replays are idempotent. Postgres routes each row to the `layer_<layerName>` partition automatically; `footprint` values coming in as GeoJSON Polygons are converted to PostGIS geometry by the `pg` driver.
- `deleteDeprecatedObjects(layerName, deprecated)` - batch `DELETE FROM layer_objects WHERE layer_name = :layerName AND id IN (:...ids)`. Partition pruning limits the delete to the matching partition.
- `insertObjects(layerName, objects)` - bulk insert into `layer_objects` with `layer_name` stamped on every row; uses `orIgnore()` (`ON CONFLICT DO NOTHING`) so sync retries/replays are idempotent. Postgres routes each row to the `layer_<layerName>` partition automatically; `geom` values coming in as GeoJSON Polygons are converted to PostGIS geometry by the `pg` driver.
- `deleteDeprecatedObjects(layerName, deletedIds)` - batch `DELETE FROM layer_objects WHERE layer_name = :layerName AND id IN (:...ids)`. Partition pruning limits the delete to the matching partition.

### Wiring

- `**src/containerConfig.ts`** - reads `sync.layers` via `getSyncConfig()`, calls `await initializeDb(syncConfig.layers)` during bootstrap, logs the connection target + active layer partitions, and calls `closeDb()` alongside `getTracing().stop()` in the `onSignal` shutdown hook.
- `**src/containerConfig.ts`\*\* - reads `sync.layers` via `getSyncConfig()`, calls `await initializeDb(syncConfig.layers)` during bootstrap, logs the connection target + active layer partitions, and calls `closeDb()` alongside `getTracing().stop()` in the `onSignal` shutdown hook.
- `**src/handler/layerSyncHandler.ts**` - awaits all now-async repository calls.
- `**src/scheduler/syncManager.ts**` - `start()` is now `async` and awaits state initialization / reads.
- `**src/index.ts**` - `void syncManager.start()` to keep the fire-and-forget semantics.
Expand All @@ -66,13 +66,17 @@ The layer data schema uses **native PostgreSQL LIST partitioning**: there is a s
To onboard a new layer (e.g. `roads`):

1. Add it to `sync.layers` in `config/default.json` (or the env-specific config):
```json
"sync": { "layers": ["obstacles", "roads"], ... }
```

```json
"sync": { "layers": ["obstacles", "roads"], ... }
```

2. Restart the service. On startup, `ensureLayerPartitions()` will run:
```sql
CREATE TABLE IF NOT EXISTS "layer_roads" PARTITION OF layer_objects FOR VALUES IN ('roads');
```

```sql
CREATE TABLE IF NOT EXISTS "layer_roads" PARTITION OF layer_objects FOR VALUES IN ('roads');
```

3. No code changes are required - `insertObjects('roads', ...)` and `deleteDeprecatedObjects('roads', ...)` already take `layerName` as a parameter, and Postgres routes writes to `layer_roads` based on the `layer_name` column.

## Why LIST partitioning (and not per-layer tables or a flat table)
Expand All @@ -85,7 +89,6 @@ This matches the documented best-practice profile for LIST partitioning: a small

## Files changed


| File | Change |
| --------------------------------------------- | --------------------------------------------------------------------------------------------------------- |
| `migrations/001_create_tables.sql` | `sync_state` + LIST-partitioned `layer_objects` parent (partitions created at runtime) |
Expand All @@ -106,7 +109,6 @@ This matches the documented best-practice profile for LIST partitioning: a small
| `src/containerConfig.ts` | `initializeDb(syncConfig.layers)` on boot, `closeDb()` on signal |
| `package.json` | + `pg`, `typeorm` |


## Migration / rollout

1. Apply `migrations/001_create_tables.sql` against the target PostgreSQL database (creates `sync_state` and the partitioned `layer_objects` parent). Per-layer partitions are created automatically on first startup.
Expand All @@ -121,4 +123,3 @@ This matches the documented best-practice profile for LIST partitioning: a small
- The partition key `layer_name` is part of the primary key (required by Postgres for partitioned tables), so `(layer_name, id)` is the effective uniqueness constraint across the whole logical dataset.
- Layer names come from trusted config (`sync.layers`); they are interpolated into the `CREATE TABLE … PARTITION OF …` DDL in `ensureLayerPartitions()` - keep `sync.layers` out of any user-controlled input path.
- No HTTP routes were added; the service remains a background sync worker behind Terminus and Express middleware.

18 changes: 6 additions & 12 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,13 @@
"layers": ["obstacles"],
"syncIntervalMs": 500,
"pollIntervalMs": 600000,
"pageSize": 500,
"pageSize": 1000,
"thirdPartyBaseUrl": "http://mock-third-party/graphql",
"auth": {
"token": ""
},
"system": {
"name": "sync-layer-server",
"details": {
"description": "Sync layer service pulling geospatial layers from the third-party API into PostgreSQL",
"version": "1.0.0",
"owner": "libot"
}
}
"realityId": 0,
"requestingSystem": "sync-layer-server_prod",
"requestingSystemName": "sync-layer-server",
"useDeleteEntities": true,
"authToken": ""
},
"db": {
"type": "postgres",
Expand Down
18 changes: 10 additions & 8 deletions migrations/001_create_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,31 @@
-- `layer_objects` is a LIST-partitioned parent table on `layer_name`.
-- Per-layer partitions (layer_<name>) are created at runtime by ensureLayerPartitions()
-- in src/dal/connection.ts based on sync.layers in config.
-- Requires the PostGIS extension for the `footprint` geometry column + GiST index.
-- Requires the PostGIS extension for the `geom` geometry column + GiST index.

CREATE EXTENSION IF NOT EXISTS postgis;

CREATE TYPE "sync_status_enum" AS ENUM ('SYNCING', 'READY');

CREATE TABLE IF NOT EXISTS sync_state (
layer_name TEXT PRIMARY KEY,
Comment thread
RonIsraeli123 marked this conversation as resolved.
status TEXT NOT NULL DEFAULT 'SYNCING',
last_offset INTEGER NOT NULL DEFAULT 0,
status "sync_status_enum" NOT NULL DEFAULT 'SYNCING'::"sync_status_enum",
last_sequence TEXT NOT NULL DEFAULT '0',
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE TABLE IF NOT EXISTS layer_objects (
layer_name TEXT NOT NULL,
id TEXT NOT NULL,
footprint geometry(Polygon, 4326) NOT NULL,
geom geometry NOT NULL,
properties JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (layer_name, id),
CONSTRAINT layer_objects_valid_geometry CHECK (ST_IsValid(footprint)),
CONSTRAINT layer_objects_valid_geometry CHECK (ST_IsValid(geom)),
CONSTRAINT layer_objects_extent CHECK (
Box2D(footprint) @ Box2D(ST_GeomFromText('LINESTRING(-180 -90, 180 90)', 4326))
Box2D(geom) @ Box2D(ST_GeomFromText('LINESTRING(-180 -90, 180 90)', 4326))
)
) PARTITION BY LIST (layer_name);

CREATE INDEX IF NOT EXISTS idx_layer_objects_footprint
ON layer_objects USING GIST (footprint);
CREATE INDEX IF NOT EXISTS idx_layer_objects_geom
ON layer_objects USING GIST (geom);
Loading
Loading