Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ HORIZON_URL=https://horizon-testnet.stellar.org
HORIZON_TIMEOUT=2000
SETTLEMENT_STATUS_SYNC_INTERVAL_MS=60000
SETTLEMENT_STATUS_SYNC_TIMEOUT_MS=5000
REVENUE_LEDGER_INDEXER_INTERVAL_MS=30000
REVENUE_LEDGER_INDEXER_BATCH_SIZE=500

# -----------------------------------------------------------------------------
# Stellar / Soroban network selection
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ The request requires developer auth via `Authorization: Bearer ...` or `x-user-i

- The runtime now uses PostgreSQL-backed `SettlementStore` and `UsageStore` implementations so `/api/developers/revenue` survives process restarts.
- Unsettled usage is persisted through `revenue_ledger`, and settlement batches are persisted through `settlements`.
- A background revenue ledger indexer backfills `revenue_ledger` from `usage_events`, keyed by `usage_event_id` and resolving API ownership from `apis`.
- The in-memory store factories are still available for unit tests and isolated local scenarios.
- Apply `migrations/001_create_usage_events.sql`, `migrations/002_create_settlements.sql`, `migrations/003_create_revenue_ledger.sql`, and `migrations/005_add_persistent_store_columns.sql` before starting the API against PostgreSQL.

Expand Down
27 changes: 27 additions & 0 deletions src/__tests__/persistentStores.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import request from 'supertest';
import { DataType, newDb } from 'pg-mem';
import { createDeveloperRouter } from '../routes/developerRoutes.js';
import { errorHandler } from '../middleware/errorHandler.js';
import type { DeveloperRepository } from '../repositories/developerRepository.js';
import { createPostgresSettlementStore } from '../services/settlementStore.js';
import { createPostgresUsageStore } from '../services/usageStore.js';

Expand Down Expand Up @@ -30,6 +31,11 @@ function createPersistentStoreHarness() {
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE TABLE apis (
id VARCHAR(255) PRIMARY KEY,
developer_id VARCHAR(255) NOT NULL
);

CREATE TABLE settlements (
id BIGSERIAL PRIMARY KEY,
external_id VARCHAR(255) NOT NULL UNIQUE,
Expand Down Expand Up @@ -102,6 +108,11 @@ test('PostgresUsageStore records idempotently and marks events as settled', asyn
const { pool, settlementStore, usageStore } = createPersistentStoreHarness();

try {
await pool.query(
'INSERT INTO apis (id, developer_id) VALUES ($1, $2)',
['api-1', 'api-owner-1'],
);

const firstInsert = await usageStore.record({
id: 'ignored-in-pg-store',
requestId: 'req-1',
Expand Down Expand Up @@ -137,6 +148,7 @@ test('PostgresUsageStore records idempotently and marks events as settled', asyn
amountUsdc: 4.25,
statusCode: 200,
apiKey: 'key-1',
userId: 'api-owner-1',
settlementId: undefined,
});

Expand Down Expand Up @@ -165,6 +177,11 @@ test('persistent stores survive new instances and keep developer revenue availab
const harness = createPersistentStoreHarness();

try {
await harness.pool.query(
'INSERT INTO apis (id, developer_id) VALUES ($1, $2)',
['api-restart', 'dev_restart'],
);

await harness.settlementStore.create({
id: 'stl_completed',
developerId: 'dev_restart',
Expand Down Expand Up @@ -196,9 +213,19 @@ test('persistent stores survive new instances and keep developer revenue availab

const app = express();
app.use(express.json());
const developerRepository: DeveloperRepository = {
findByUserId: async () => undefined,
getOrCreateByUserId: async () => {
throw new Error('not used in this test');
},
upsertProfile: async () => {
throw new Error('not used in this test');
},
};
app.use('/api/developers', createDeveloperRouter({
settlementStore: createPostgresSettlementStore(harness.pool),
usageStore: createPostgresUsageStore(harness.pool),
developerRepository,
}));
app.use(errorHandler);

Expand Down
20 changes: 20 additions & 0 deletions src/config/env.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,23 @@ describe('env schema — REST rate limit config', () => {
expect(result.success).toBe(false);
});
});

describe('env schema — revenue ledger indexer config', () => {
it('defaults revenue ledger indexer values when omitted', () => {
const result = envSchema.safeParse({ ...baseEnv });
expect(result.success).toBe(true);
if (result.success) {
expect(result.data.REVENUE_LEDGER_INDEXER_INTERVAL_MS).toBe(30_000);
expect(result.data.REVENUE_LEDGER_INDEXER_BATCH_SIZE).toBe(500);
}
});

it('rejects non-positive revenue ledger indexer values', () => {
const result = envSchema.safeParse({
...baseEnv,
REVENUE_LEDGER_INDEXER_INTERVAL_MS: '0',
REVENUE_LEDGER_INDEXER_BATCH_SIZE: '-10',
});
expect(result.success).toBe(false);
});
});
2 changes: 2 additions & 0 deletions src/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ export const envSchema = z
HORIZON_TIMEOUT: z.coerce.number().default(2_000),
SETTLEMENT_STATUS_SYNC_INTERVAL_MS: z.coerce.number().int().positive().default(60_000),
SETTLEMENT_STATUS_SYNC_TIMEOUT_MS: z.coerce.number().int().positive().default(5_000),
REVENUE_LEDGER_INDEXER_INTERVAL_MS: z.coerce.number().int().positive().default(30_000),
REVENUE_LEDGER_INDEXER_BATCH_SIZE: z.coerce.number().int().positive().default(500),

// Stellar network configuration
STELLAR_NETWORK: stellarNetworkSchema.optional(),
Expand Down
4 changes: 4 additions & 0 deletions src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ export const config = {
intervalMs: env.SETTLEMENT_STATUS_SYNC_INTERVAL_MS,
timeoutMs: env.SETTLEMENT_STATUS_SYNC_TIMEOUT_MS,
},
revenueLedgerIndexer: {
intervalMs: env.REVENUE_LEDGER_INDEXER_INTERVAL_MS,
batchSize: env.REVENUE_LEDGER_INDEXER_BATCH_SIZE,
},

stellar: {
network: selectedNetwork,
Expand Down
16 changes: 16 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import { createProxyRouter } from './routes/proxyRoutes.js';
import { defaultDeveloperRepository } from './repositories/developerRepository.js';
import { createBillingService } from './services/billingService.js';
import { createRateLimiter } from './services/rateLimiter.js';
import { PgUsageEventsRepository } from './repositories/usageEventsRepository.pg.js';
import { createRevenueLedgerIndexerJob } from './services/revenueLedgerIndexer.js';
import { RevenueSettlementService } from './services/revenueSettlementService.js';
import { createSettlementStatusSyncJob } from './services/settlementStatusSyncJob.js';
import { createPostgresUsageStore } from './services/usageStore.js';
import { createPostgresSettlementStore } from './services/settlementStore.js';
import { createApiRegistry } from './data/apiRegistry.js';
Expand Down Expand Up @@ -237,6 +241,11 @@ if (isDirectExecution) {
const rateLimiter = createRateLimiter(5, 60_000); // 5 reqs per minute
const usageStore = createPostgresUsageStore(pool);
const settlementStore = createPostgresSettlementStore(pool);
const usageEventsRepository = new PgUsageEventsRepository(pool);
const revenueLedgerIndexerJob = createRevenueLedgerIndexerJob(usageEventsRepository, {
intervalMs: config.revenueLedgerIndexer.intervalMs,
batchSize: config.revenueLedgerIndexer.batchSize,
});
const registry = createApiRegistry();
const revenueSettlementService = new RevenueSettlementService(
usageStore,
Expand Down Expand Up @@ -294,6 +303,11 @@ if (isDirectExecution) {
const proxyDrainTracker = createInFlightDrainTracker('gateway-proxy');
const shutdownSubsystems: DrainableSubsystem[] = [
proxyDrainTracker.subsystem,
{
name: 'revenue-ledger-indexer',
beginShutdown: () => revenueLedgerIndexerJob.beginShutdown(),
awaitIdle: () => revenueLedgerIndexerJob.awaitIdle(),
},
{
name: 'webhook-dispatcher',
beginShutdown: stopWebhookDispatching,
Expand All @@ -312,6 +326,7 @@ if (isDirectExecution) {
const PORT = config.port;

const closeAllDataResources = async () => {
revenueLedgerIndexerJob.stop();
settlementStatusSyncJob.stop();
await closeDb();
await Promise.allSettled([
Expand All @@ -325,6 +340,7 @@ if (isDirectExecution) {
async function startServer() {
try {
await initializeDb();
revenueLedgerIndexerJob.start();
settlementStatusSyncJob.start();

const server = app.listen(PORT, () => {
Expand Down
127 changes: 127 additions & 0 deletions src/repositories/usageEventsRepository.pg.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ function createUsageEventsRepository() {
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE TABLE apis (
id VARCHAR(255) PRIMARY KEY,
developer_id VARCHAR(255) NOT NULL
);

CREATE TABLE revenue_ledger (
id BIGSERIAL PRIMARY KEY,
api_id VARCHAR(255) NOT NULL,
developer_id VARCHAR(255) NOT NULL,
amount_usdc NUMERIC(20, 0) NOT NULL,
usage_event_id BIGINT UNIQUE REFERENCES usage_events(id),
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_usage_events_user_created ON usage_events(user_id, created_at);
CREATE INDEX idx_usage_events_api_created ON usage_events(api_id, created_at);
`);
Expand Down Expand Up @@ -344,6 +358,11 @@ test('repository validates blank identifiers, invalid ranges, negative amounts,
repository.findByApiId('api-weather', undefined, new Date('nope')),
/to must be a valid date\./,
);

await assert.rejects(
repository.findUnindexedRevenueLedgerEvents('bad-cursor'),
/cursor must be a non-negative integer string\./,
);
} finally {
await pool.end();
}
Expand Down Expand Up @@ -461,3 +480,111 @@ test('repository accepts bigint values returned directly from the database drive
assert.equal(events[0]?.id, '7');
assert.equal(events[0]?.amount, 450n);
});

test('findUnindexedRevenueLedgerEvents resolves developer ownership from apis and skips indexed rows', async () => {
const { repository, pool } = createUsageEventsRepository();

try {
await pool.query(
'INSERT INTO apis (id, developer_id) VALUES ($1, $2), ($3, $4)',
['api-weather', 'dev-weather', 'api-chat', 'dev-chat'],
);

await repository.create({
userId: 'consumer-1',
apiId: 'api-weather',
endpointId: 'endpoint-1',
apiKeyId: 'key-1',
amount: 100n,
requestId: 'req-ledger-1',
createdAt: new Date('2026-02-01T10:00:00.000Z'),
});
await repository.create({
userId: 'consumer-2',
apiId: 'api-chat',
endpointId: 'endpoint-2',
apiKeyId: 'key-2',
amount: 250n,
requestId: 'req-ledger-2',
createdAt: new Date('2026-02-02T10:00:00.000Z'),
});
await repository.create({
userId: 'consumer-3',
apiId: 'api-missing',
endpointId: 'endpoint-3',
apiKeyId: 'key-3',
amount: 999n,
requestId: 'req-ledger-3',
createdAt: new Date('2026-02-03T10:00:00.000Z'),
});

await pool.query(
`
INSERT INTO revenue_ledger (
api_id,
developer_id,
amount_usdc,
usage_event_id,
created_at
)
VALUES ($1, $2, $3, $4, $5)
`,
['api-weather', 'dev-weather', '100', '1', new Date('2026-02-01T10:00:00.000Z')],
);

const events = await repository.findUnindexedRevenueLedgerEvents();

assert.deepEqual(events, [
{
usageEventId: '2',
apiId: 'api-chat',
developerId: 'dev-chat',
amount: 250n,
createdAt: new Date('2026-02-02T10:00:00.000Z'),
},
]);
} finally {
await pool.end();
}
});

test('indexRevenueLedgerEvent inserts idempotently by usageEventId', async () => {
const { repository, pool } = createUsageEventsRepository();

try {
await repository.create({
userId: 'consumer-1',
apiId: 'api-weather',
endpointId: 'endpoint-1',
apiKeyId: 'key-1',
amount: 1500n,
requestId: 'req-ledger-insert',
createdAt: new Date('2026-02-05T10:00:00.000Z'),
});

const insertedFirst = await repository.indexRevenueLedgerEvent({
usageEventId: '1',
apiId: 'api-weather',
developerId: 'dev-weather',
amount: 1500n,
createdAt: new Date('2026-02-05T10:00:00.000Z'),
});
const insertedDuplicate = await repository.indexRevenueLedgerEvent({
usageEventId: '1',
apiId: 'api-weather',
developerId: 'dev-weather',
amount: 1500n,
createdAt: new Date('2026-02-05T10:00:00.000Z'),
});
const count = await pool.query(
'SELECT COUNT(*)::text AS count FROM revenue_ledger WHERE usage_event_id = $1',
['1'],
);

assert.equal(insertedFirst, true);
assert.equal(insertedDuplicate, false);
assert.equal(count.rows[0]?.count, '1');
} finally {
await pool.end();
}
});
Loading
Loading