Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion diagram-editor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tower = "0.5.2"

[features]
default = ["frontend", "router"]
default = ["frontend", "router", "python"]
debug = ["router", "axum/ws"]
frontend = ["router", "dep:flate2", "dep:tar"]
router = []
Expand All @@ -69,6 +69,7 @@ basic_executor = [
"tokio/rt-multi-thread",
"axum/default",
]
python = ["crossflow/python"]

[[bin]]
name = "print_schema"
Expand Down
Binary file modified diagram-editor/dist.tar.gz
Binary file not shown.
16 changes: 16 additions & 0 deletions diagram-editor/frontend/add-operation.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
ListenIcon,
NodeIcon,
ScopeIcon,
ScriptIcon,
SectionBufferIcon,
type SectionBufferNode,
SectionIcon,
Expand Down Expand Up @@ -287,6 +288,21 @@ function AddOperation({ parentId, newNodePosition, onAdd }: AddOperationProps) {
>
Transform
</StyledOperationButton>
<StyledOperationButton
startIcon={<ScriptIcon />}
onClick={() => {
onAdd?.(
createNodeChange(namespace, parentId, newNodePosition, {
type: 'script',
environment: '',
run: '',
next: { builtin: 'dispose' },
}),
);
}}
>
Script
</StyledOperationButton>
<StyledOperationButton
startIcon={<BufferIcon />}
onClick={() =>
Expand Down
115 changes: 115 additions & 0 deletions diagram-editor/frontend/api-client/rest-client-integration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/**
* @jest-environment node
*/

import fs from 'fs';
import path from 'path';
import { spawn, ChildProcess } from 'child_process';
import { firstValueFrom } from 'rxjs';
import { ApiClient } from './rest-client';
import type { Diagram } from '../types/api';

const calculatorDiagramsDir = path.join(__dirname, '../../../examples/diagram/calculator/diagrams');

function getJsonDiagrams(dir: string): string[] {
if (!fs.existsSync(dir)) {
return [];
}
return fs
.readdirSync(dir)
.filter((file) => file.endsWith('.json') && file !== 'test-diagram.json' && file !== 'test-diagram-scope.json')
.map((file) => path.join(dir, file));
}

describe('REST API Executor Integration Tests', () => {
let backendProcess: ChildProcess;
const apiClient = new ApiClient();
const originalFetch = global.fetch;

beforeAll(async () => {
// Setup fetch interceptor for relative REST API requests
global.fetch = (input: RequestInfo | URL, init?: RequestInit) => {
if (typeof input === 'string' && input.startsWith('/api/')) {
input = `http://localhost:3001${input}`;
}
return originalFetch(input, init);
};

// Start the calculator executor server in the background on port 3001
const calculatorCwd = path.join(calculatorDiagramsDir, '..');

backendProcess = spawn('cargo', ['run', '--features', 'python', '--', 'serve', '--port', '3001'], {
cwd: calculatorCwd,
env: {
...process.env,
BUILD_FRONTEND: '1', // Prevent build-script blocking
},
stdio: 'ignore', // Ignore server output
});

// Wait until the server is online and ready
await waitForServer('http://localhost:3001/api/registry');
}, 35000); // Allow up to 35s for compile & launch

afterAll(() => {
// Restore global fetch
global.fetch = originalFetch;

// Cleanup spawned backend server process
if (backendProcess) {
backendProcess.kill('SIGTERM');
}
});

const diagramPaths = getJsonDiagrams(calculatorDiagramsDir);

for (const diagramPath of diagramPaths) {
const fileName = path.basename(diagramPath);

const diagram: Diagram = JSON.parse(fs.readFileSync(diagramPath, 'utf-8'));
if (diagram.input_examples && diagram.input_examples.length > 0) {
describe(`Diagram: ${fileName}`, () => {
for (const example of diagram.input_examples) {
test(`postRunWorkflow with example: "${example.description}"`, async () => {
let requestPayload: unknown = example.value;
if (typeof example.value === 'string') {
try {
// If it parses as JSON (like arrays or objects), we use the parsed value
requestPayload = JSON.parse(example.value);
} catch (e) {
// Otherwise check if it's a number representation
const num = Number(example.value);
if (!isNaN(num)) {
requestPayload = num;
}
}
}

const response = await firstValueFrom(
apiClient.postRunWorkflow(diagram, requestPayload)
);

expect(response).toBeDefined();
expect(response).not.toBeNull();
}, 15000);
}
});
}
}
});

async function waitForServer(url: string, timeoutMs = 30000): Promise<void> {
const start = Date.now();
while (Date.now() - start < timeoutMs) {
try {
const res = await fetch(url);
if (res.ok) {
return;
}
} catch (e) {
// Ignored
}
await new Promise((resolve) => setTimeout(resolve, 200));
}
throw new Error(`Server at ${url} did not become ready in time`);
}
Loading
Loading