Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,9 @@ engine.summingMergeTree({
engine.aggregatingMergeTree({
sortingKey: ["date"],
});

// Null (for materialized view source tables that discard raw input)
engine.null();
```

## Next.js Integration
Expand Down
263 changes: 263 additions & 0 deletions e2e-live/null-engine.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
import {
describe,
it,
expect,
beforeAll,
beforeEach,
afterEach,
afterAll,
} from "vitest";
import * as fs from "node:fs";
import * as path from "node:path";
import { runInit } from "../src/cli/commands/init.js";
import { runBuild } from "../src/cli/commands/build.js";
import { getBranch, deleteBranch, BranchApiError } from "../src/api/branches.js";
import { createClient } from "../src/client/base.js";
import {
getLiveE2EConfigFromEnv,
assertWorkspaceAdminToken,
createWorkspaceWithToken,
deleteWorkspace,
type LiveE2EConfig,
} from "./cloud-workspace.js";
import {
ensureDistBuild,
createTempProjectDir,
cleanupTempProjectDir,
setConfigBaseUrl,
} from "./test-project.js";

const liveConfig = getLiveE2EConfigFromEnv();
const describeLive = liveConfig ? describe : describe.skip;

const SOURCE_DATASOURCE_NAME = "null_engine_source_events";
const TARGET_DATASOURCE_NAME = "null_engine_processed_events";
const MATERIALIZED_PIPE_NAME = "null_engine_processed_mv";
const ENDPOINT_NAME = "null_engine_processed_rows";

function toTinybirdDateTime(value: Date): string {
return value.toISOString().slice(0, 19).replace("T", " ");
}

function writeNullEngineTestEntities(projectDir: string): void {
const tinybirdFile = path.join(projectDir, "lib", "tinybird.ts");
const content = `import {
defineDatasource,
defineEndpoint,
defineMaterializedView,
Tinybird,
node,
t,
engine,
} from "@tinybirdco/sdk";

export const nullEngineSourceEvents = defineDatasource("${SOURCE_DATASOURCE_NAME}", {
schema: {
timestamp: t.dateTime(),
run_id: t.string(),
metric_value: t.int32(),
},
engine: engine.null(),
});

export const nullEngineProcessedEvents = defineDatasource("${TARGET_DATASOURCE_NAME}", {
schema: {
timestamp: t.dateTime(),
run_id: t.string(),
metric_value: t.int32(),
doubled_metric_value: t.int32(),
},
engine: engine.mergeTree({
sortingKey: ["run_id", "timestamp"],
}),
});

export const nullEngineProcessedMv = defineMaterializedView("${MATERIALIZED_PIPE_NAME}", {
datasource: nullEngineProcessedEvents,
nodes: [
node({
name: "processed",
sql: \`
SELECT
timestamp,
run_id,
metric_value,
metric_value * 2 AS doubled_metric_value
FROM ${SOURCE_DATASOURCE_NAME}
\`,
}),
],
});

export const nullEngineProcessedRows = defineEndpoint("${ENDPOINT_NAME}", {
nodes: [
node({
name: "rows",
sql: \`
SELECT
timestamp,
run_id,
metric_value,
doubled_metric_value
FROM ${TARGET_DATASOURCE_NAME}
ORDER BY timestamp DESC
LIMIT 100
\`,
}),
],
output: {
timestamp: t.dateTime(),
run_id: t.string(),
metric_value: t.int32(),
doubled_metric_value: t.int32(),
},
});

export const tinybird = new Tinybird({
datasources: {
nullEngineSourceEvents,
nullEngineProcessedEvents,
},
pipes: {
nullEngineProcessedMv,
nullEngineProcessedRows,
},
});
`;

fs.writeFileSync(tinybirdFile, content);
}

async function waitForProcessedRows(
branchToken: string,
baseUrl: string,
runId: string
): Promise<Array<Record<string, unknown>>> {
const client = createClient({ baseUrl, token: branchToken });

for (let attempt = 0; attempt < 15; attempt++) {
const result = await client.query<Record<string, unknown>>(ENDPOINT_NAME);
const rows = result.data.filter((row) => row.run_id === runId);

if (rows.length > 0) {
return rows;
}

await new Promise((resolve) => setTimeout(resolve, 1_000));
}

return [];
}

describeLive("E2E Live: Null engine", () => {
const config = liveConfig as LiveE2EConfig;

let tempDir = "";
let originalEnv: NodeJS.ProcessEnv;

let workspaceId = "";
let workspaceName = "";
let workspaceToken = "";
let branchName = "";

beforeAll(async () => {
ensureDistBuild();
await assertWorkspaceAdminToken(config);

const workspace = await createWorkspaceWithToken(config, "sdk_null_engine");
workspaceId = workspace.id;
workspaceName = workspace.name;
workspaceToken = workspace.token;
});

beforeEach(() => {
tempDir = createTempProjectDir();
originalEnv = { ...process.env };
process.env.GITHUB_REF_NAME = `live-null-engine/${workspaceName}`;
process.env.TINYBIRD_TOKEN = workspaceToken;
});

afterEach(async () => {
if (branchName) {
try {
await deleteBranch(
{ baseUrl: config.baseUrl, token: workspaceToken },
branchName
);
} catch (error) {
if (!(error instanceof BranchApiError && error.status === 404)) {
throw error;
}
}
}

branchName = "";
process.env = originalEnv;
cleanupTempProjectDir(tempDir);
});

afterAll(async () => {
if (workspaceId && workspaceName) {
await deleteWorkspace(config, workspaceId, workspaceName);
}
});

it("builds a Null engine source and ingests through its materialized view target", async () => {
const initResult = await runInit({
cwd: tempDir,
skipLogin: true,
devMode: "branch",
clientPath: "lib/tinybird.ts",
});
expect(initResult.success).toBe(true);

setConfigBaseUrl(tempDir, config.baseUrl);
writeNullEngineTestEntities(tempDir);

const buildResult = await runBuild({ cwd: tempDir });
expect(buildResult.success).toBe(true);
expect(buildResult.deploy?.success).toBe(true);

const sourceDatasource = buildResult.build?.resources.datasources.find(
(datasource) => datasource.name === SOURCE_DATASOURCE_NAME
);
expect(sourceDatasource?.content).toContain("ENGINE Null");
expect(sourceDatasource?.content).not.toContain("ENGINE_SORTING_KEY");

branchName = buildResult.branchInfo?.tinybirdBranch ?? "";
expect(branchName).toBeTruthy();

const branch = await getBranch(
{ baseUrl: config.baseUrl, token: workspaceToken },
branchName
);
const branchToken = branch.token ?? "";
expect(branchToken).toBeTruthy();

const client = createClient({
baseUrl: config.baseUrl,
token: branchToken,
});

const runId = `null_engine_${Date.now()}`;
const metricValue = 21;

const ingestResult = await client.datasources.ingest(SOURCE_DATASOURCE_NAME, {
timestamp: toTinybirdDateTime(new Date()),
run_id: runId,
metric_value: metricValue,
});
expect(ingestResult.successful_rows).toBe(1);

const rows = await waitForProcessedRows(branchToken, config.baseUrl, runId);
expect(rows.length).toBeGreaterThan(0);
expect(
rows.some(
(row) =>
row.run_id === runId &&
Number(row.metric_value) === metricValue &&
Number(row.doubled_metric_value) === metricValue * 2
)
).toBe(true);
});
});
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@tinybirdco/sdk",
"version": "0.0.69",
"version": "0.0.70",
"description": "TypeScript SDK for Tinybird Forward - define datasources and pipes as TypeScript",
"type": "module",
"main": "./dist/index.js",
Expand Down
29 changes: 29 additions & 0 deletions src/cli/commands/migrate.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1284,6 +1284,35 @@ TYPE endpoint
expect(output).not.toContain(", engine,");
});

it("migrates datasource with Null engine", async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-"));
tempDirs.push(tempDir);

writeFile(
tempDir,
"null_source.datasource",
`SCHEMA >
id String,
timestamp DateTime

ENGINE Null
`
);

const result = await runMigrate({
cwd: tempDir,
patterns: ["."],
strict: true,
});

expect(result.success).toBe(true);
expect(result.errors).toHaveLength(0);

const output = fs.readFileSync(result.outputPath, "utf-8");
expect(output).toContain('export const nullSource = defineDatasource("null_source", {');
expect(output).toContain("engine: engine.null(),");
});

it("infers MergeTree when engine options exist without ENGINE directive", async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-"));
tempDirs.push(tempDir);
Expand Down
7 changes: 7 additions & 0 deletions src/codegen/utils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,13 @@ describe("generateEngineCode", () => {
expect(code).toContain('version: "version"');
});

it("generates Null engine", () => {
const code = generateEngineCode({
type: "Null",
});
expect(code).toBe("engine.null()");
});

it("defaults to mergeTree for unknown engine types", () => {
const code = generateEngineCode({
type: "UnknownEngine",
Expand Down
5 changes: 5 additions & 0 deletions src/codegen/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ export function generateEngineCode(engine: {
version?: string;
summing_columns?: string;
}): string {
if (engine.type === "Null") {
return "engine.null()";
}

const sortingKey = parseSortingKey(engine.sorting_key);

// Build options object
Expand Down Expand Up @@ -143,6 +147,7 @@ export function generateEngineCode(engine: {
AggregatingMergeTree: "engine.aggregatingMergeTree",
CollapsingMergeTree: "engine.collapsingMergeTree",
VersionedCollapsingMergeTree: "engine.versionedCollapsingMergeTree",
Null: "engine.null",
};

const engineFunc = engineFunctionMap[engine.type] ?? "engine.mergeTree";
Expand Down
13 changes: 13 additions & 0 deletions src/generator/datasource.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ describe('Datasource Generator', () => {
expect(result.content).toContain('ENGINE_SORTING_KEY "id"');
});

it('includes Null engine configuration without sorting key', () => {
const ds = defineDatasource('test_ds', {
schema: {
id: t.string(),
},
engine: engine.null(),
});

const result = generateDatasource(ds);
expect(result.content).toContain('ENGINE Null');
expect(result.content).not.toContain('ENGINE_SORTING_KEY');
});

it('includes partition key in engine config', () => {
const ds = defineDatasource('test_ds', {
schema: {
Expand Down
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,15 @@ export {
export { engine, getEngineClause, getSortingKey, getPrimaryKey } from "./schema/engines.js";
export type {
EngineConfig,
MergeTreeEngineConfig,
BaseMergeTreeConfig,
MergeTreeConfig,
ReplacingMergeTreeConfig,
SummingMergeTreeConfig,
AggregatingMergeTreeConfig,
CollapsingMergeTreeConfig,
VersionedCollapsingMergeTreeConfig,
NullEngineConfig,
} from "./schema/engines.js";

// ============ Utilities ============
Expand Down
Loading
Loading