Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions workspaces/orchestrator/.changeset/twelve-insects-scream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@red-hat-developer-hub/backstage-plugin-orchestrator-backend': minor
---

feature: add the ability to trigger kafka based workflows from the plugin
12 changes: 12 additions & 0 deletions workspaces/orchestrator/app-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,18 @@ orchestrator:
# logStreamSelectors:
# - label: 'selector'
# value: 'value'
kafka:
# A logical identifier of an application.
# https://kafka.js.org/docs/configuration#client-id
clientId: orchestratorKafka
# logLevel override for the orchestrator kafka services. Defaults to INFO which is 4
# logLevel values based on KafkaJS values https://kafka.js.org/docs/configuration#logging
# logLevel: 5 // DEBUG
brokers:
- localhost:9092
# https://kafka.js.org/docs/producing#message-key
# Optional and will just default to an empty string
# messageKey: messageKey
sonataFlowService:
# uncomment the next line to use podman instead of docker
# runtime: podman
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

export interface Config {
Comment thread
lholmquist marked this conversation as resolved.
/**
* Configuration for the Orchestrator plugin.
*/
orchestrator?: {
kafka?: {
// A logical identifier of an application.
// https://kafka.js.org/docs/configuration#client-id
clientId: string;
// logLevel override for the orchestrator kafka services
// logLevel values based on KafkaJS values https://kafka.js.org/docs/configuration#logging
// export enum logLevel {
// NOTHING = 0,
// ERROR = 1,
// WARN = 2,
// INFO = 4,
// DEBUG = 5,
// }
logLevel?: 0 | 1 | 2 | 4 | 5;
/**
* List of brokers in the Kafka cluster to connect to.
*/
brokers: string[];
/**
* https://kafka.js.org/docs/producing#message-key
*/
messageKey?: string;
/**
* Optional SSL connection parameters to connect to the cluster. Passed directly to Node tls.connect.
* See https://nodejs.org/dist/latest-v8.x/docs/api/tls.html#tls_tls_createsecurecontext_options
*/
ssl?:
| {
ca?: string[];
/** @visibility secret */
key?: string;
cert?: string;
rejectUnauthorized?: boolean;
}
| boolean;
/**
* Optional SASL connection parameters.
*/
sasl?: {
mechanism: 'plain' | 'scram-sha-256' | 'scram-sha-512';
username: string;
/** @visibility secret */
password: string;
};
};
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@
],
"files": [
"app-config.yaml",
"config.d.ts",
"dist",
"dist-dynamic/*.*",
"dist-dynamic/dist/**",
"static"
],
"configSchema": "config.d.ts",
"scripts": {
"start": "backstage-cli package start",
"build": "backstage-cli package build",
Expand Down Expand Up @@ -78,11 +80,13 @@
"@red-hat-developer-hub/backstage-plugin-orchestrator-node": "workspace:^",
"@urql/core": "^6.0.1",
"ajv-formats": "^2.1.1",
"cloudevents": "^8.0.0",
"cloudevents": "^10.0.0",
"express": "^4.21.2",
"express-promise-router": "^4.1.1",
"fs-extra": "^10.1.0",
"isomorphic-git": "^1.23.0",
"js-yaml": "^4.1.0",
"kafkajs": "^2.2.4",
"lodash": "^4.18.1",
"luxon": "^3.7.2",
"openapi-backend": "^5.10.5",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 The Backstage Authors
* Copyright Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import type { LoggerService } from '@backstage/backend-plugin-api';
import type { Config } from '@backstage/config';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,4 +327,30 @@ describe('OrchestratorService', () => {
expect(result).toBeDefined();
});
});

describe('executeWorkflowAsCloudEvent', () => {
const executeResponse: WorkflowExecutionResponse = {
id: createInstanceIdMock(1),
};

beforeEach(() => {
jest.clearAllMocks();
});

it('should execute the operation', async () => {
sonataFlowServiceMock.executeWorkflowAsCloudEvent = jest
.fn()
.mockResolvedValue(executeResponse);

const result = await orchestratorService.executeWorkflowAsCloudEvent({
definitionId,
workflowSource: 'local',
workflowEventType: 'lock-event',
contextAttribute: 'lockId',
inputData,
});

expect(result).toBeDefined();
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,18 @@ export class OrchestratorService {
});
}

public async executeWorkflowAsCloudEvent(args: {
definitionId: string;
workflowSource: string;
workflowEventType: string;
contextAttribute: string;
inputData?: ProcessInstanceVariables;
authTokens?: Array<AuthToken>;
backstageToken?: string;
}) {
return await this.sonataFlowService.executeWorkflowAsCloudEvent(args);
}

public async executeWorkflow(args: {
definitionId: string;
serviceUrl: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@

import { LoggerService } from '@backstage/backend-plugin-api';

import { OrchestratorKafkaServiceOptions } from '../types/kafka';
import { DataIndexService } from './DataIndexService';
import { SonataFlowService } from './SonataFlowService';

jest.mock('node:crypto', () => ({
randomUUID: () => '12345',
}));

describe('SonataFlowService', () => {
let loggerMock: jest.Mocked<LoggerService>;
let dataIndexServiceMock: jest.Mocked<DataIndexService>;
Expand Down Expand Up @@ -155,6 +160,112 @@ describe('SonataFlowService', () => {
);
});
});

describe('executeWorkflowAsCloudEvent', () => {
const runErrorTestAsCloudEventNoKafkaImplementation =
async (): Promise<void> => {
await sonataFlowService.executeWorkflowAsCloudEvent({
definitionId,
workflowSource: 'workflowSource',
workflowEventType: 'workflowEventType',
contextAttribute: 'contextAttribute',
});
};
beforeEach(() => {
jest.clearAllMocks();
});

it('should return the an error when no orchestrator kafka config is implemented', async () => {
let result;
try {
await runErrorTestAsCloudEventNoKafkaImplementation();
} catch (error: any) {
result = error;
}

expect(result).toBeDefined();
expect(result.message).toEqual(
'No Orchestrator kafka implementation added',
);
});
it('should return the contextAttributeId on successful send', async () => {
const kafkaServiceOptionsMock: OrchestratorKafkaServiceOptions = {
clientId: 'kafkaClientId',
brokers: ['localhost:9091'],
};
const sonataFlowServiceWithKafka = new SonataFlowService(
dataIndexServiceMock,
loggerMock,
kafkaServiceOptionsMock,
);
const spy = jest
.spyOn(
sonataFlowServiceWithKafka.getOrchestratorKafkaImpl() as any,
'producer',
)
.mockImplementation(() => {
return {
connect: jest.fn(),
send: jest.fn(),
disconnect: jest.fn(),
};
});
const result =
await sonataFlowServiceWithKafka.executeWorkflowAsCloudEvent({
definitionId,
workflowSource: 'workflowSource',
workflowEventType: 'workflowEventType',
contextAttribute: 'lockid',
});
expect(spy).toHaveBeenCalled();
expect(result).toBeDefined();
expect(result?.id).toBeDefined();
expect(result?.id).toEqual('12345');
});

it('should error on a bad connection', async () => {
const kafkaServiceOptionsMock: OrchestratorKafkaServiceOptions = {
clientId: 'kafkaClientId',
brokers: ['localhost:9091'],
};
const sonataFlowServiceWithKafka = new SonataFlowService(
dataIndexServiceMock,
loggerMock,
kafkaServiceOptionsMock,
);
jest
.spyOn(
sonataFlowServiceWithKafka.getOrchestratorKafkaImpl() as any,
'producer',
)
.mockImplementation(() => {
return {
connect: jest
.fn()
.mockRejectedValue(new Error('Wrong Connection Info')),
send: jest.fn(),
disconnect: jest.fn(),
};
});
let result;
try {
result = await sonataFlowServiceWithKafka.executeWorkflowAsCloudEvent({
definitionId,
workflowSource: 'workflowSource',
workflowEventType: 'workflowEventType',
contextAttribute: 'lockid',
});
} catch (error: any) {
result = error;
}

expect(result).toBeDefined();
expect(result.message).toEqual(
'Error with Kafka client with connection Options: clientId: kafkaClientId and broker: ["localhost:9091"]',
);
});
});

describe('executeWorkflow', () => {
const inputData = { var1: 'value1' };
const urlToFetch = 'http://example.com/workflows/workflow-123';
Expand Down
Loading
Loading