Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/cozy-melons-march.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@rocket.chat/model-typings': patch
'@rocket.chat/core-typings': patch
'@rocket.chat/models': patch
'@rocket.chat/meteor': patch
---

Prevents over-assignment of omnichannel agents beyond their max chats limit in microservices deployments by serializing agent assignment with explicit user-level locking.
6 changes: 6 additions & 0 deletions .changeset/good-singers-kiss.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@rocket.chat/meteor': patch
---

Fixes issue that caused Outgoing Webhook Retry Count to not be a number

81 changes: 51 additions & 30 deletions apps/meteor/app/livechat/server/lib/RoutingManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
updateChatDepartment,
allowAgentSkipQueue,
} from './Helper';
import { conditionalLockAgent } from './conditionalLockAgent';
import { afterTakeInquiry, beforeDelegateAgent } from './hooks';
import { callbacks } from '../../../../server/lib/callbacks';
import { notifyOnLivechatInquiryChangedById, notifyOnLivechatInquiryChanged } from '../../../lib/server/lib/notifyListener';
Expand Down Expand Up @@ -257,19 +258,35 @@ export const RoutingManager: Routing = {
return room;
}

try {
await callbacks.run('livechat.checkAgentBeforeTakeInquiry', {
agent,
inquiry,
options,
const lock = await conditionalLockAgent(agent.agentId);
if (!lock.acquired && lock.required) {
logger.debug({
msg: 'Cannot take inquiry because agent is currently locked by another process',
agentId: agent.agentId,
inquiryId: _id,
});
} catch (e) {
if (options.clientAction && !options.forwardingToDepartment) {
throw e;
throw new Error('error-agent-is-locked');
}
agent = null;
}

if (agent) {
try {
await callbacks.run('livechat.checkAgentBeforeTakeInquiry', {
agent,
inquiry,
options,
});
} catch (e) {
await lock.unlock();
if (options.clientAction && !options.forwardingToDepartment) {
throw e;
}
agent = null;
}
}

if (!agent) {
logger.debug({ msg: 'Cannot take inquiry. Precondition failed for agent', inquiryId: inquiry._id });
const cbRoom = await callbacks.run<'livechat.onAgentAssignmentFailed'>('livechat.onAgentAssignmentFailed', room, {
Expand All @@ -279,35 +296,39 @@ export const RoutingManager: Routing = {
return cbRoom;
}

const result = await LivechatInquiry.takeInquiry(_id, inquiry.lockedAt);
if (result.modifiedCount === 0) {
logger.error({ msg: 'Failed to take inquiry, could not match lockedAt', inquiryId: _id, lockedAt: inquiry.lockedAt });
throw new Error('error-taking-inquiry-lockedAt-mismatch');
}
try {
const result = await LivechatInquiry.takeInquiry(_id, inquiry.lockedAt);
if (result.modifiedCount === 0) {
logger.error({ msg: 'Failed to take inquiry because lockedAt did not match', inquiryId: _id, lockedAt: inquiry.lockedAt });
throw new Error('error-taking-inquiry-lockedAt-mismatch');
}

logger.info({ msg: 'Inquiry taken by agent', inquiryId: inquiry._id, agentId: agent.agentId });
logger.info({ msg: 'Inquiry taken', inquiryId: _id, agentId: agent.agentId });

// assignAgent changes the room data to add the agent serving the conversation. afterTakeInquiry expects room object to be updated
const { inquiry: returnedInquiry, user } = await this.assignAgent(inquiry as InquiryWithAgentInfo, agent);
const roomAfterUpdate = await LivechatRooms.findOneById(rid);
// assignAgent changes the room data to add the agent serving the conversation. afterTakeInquiry expects room object to be updated
const { inquiry: returnedInquiry, user } = await this.assignAgent(inquiry, agent);
const roomAfterUpdate = await LivechatRooms.findOneById(rid);

if (!roomAfterUpdate) {
// This should never happen
throw new Error('error-room-not-found');
}
if (!roomAfterUpdate) {
// This should never happen
throw new Error('error-room-not-found');
}

void Apps.self?.triggerEvent(AppEvents.IPostLivechatAgentAssigned, { room: roomAfterUpdate, user });
void afterTakeInquiry({ inquiry: returnedInquiry, room: roomAfterUpdate, agent });
void Apps.self?.triggerEvent(AppEvents.IPostLivechatAgentAssigned, { room: roomAfterUpdate, user });
void afterTakeInquiry({ inquiry: returnedInquiry, room: roomAfterUpdate, agent });

void notifyOnLivechatInquiryChangedById(inquiry._id, 'updated', {
status: LivechatInquiryStatus.TAKEN,
takenAt: new Date(),
defaultAgent: undefined,
estimatedInactivityCloseTimeAt: undefined,
queuedAt: undefined,
});
void notifyOnLivechatInquiryChangedById(inquiry._id, 'updated', {
status: LivechatInquiryStatus.TAKEN,
takenAt: new Date(),
defaultAgent: undefined,
estimatedInactivityCloseTimeAt: undefined,
queuedAt: undefined,
});

return roomAfterUpdate;
return roomAfterUpdate;
} finally {
await lock.unlock();
}
},

async transferRoom(room, guest, transferData) {
Expand Down
35 changes: 35 additions & 0 deletions apps/meteor/app/livechat/server/lib/conditionalLockAgent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { Users } from '@rocket.chat/models';

import { settings } from '../../../settings/server';

type LockResult = {
acquired: boolean;
required: boolean;
unlock: () => Promise<void>;
};

export async function conditionalLockAgent(agentId: string): Promise<LockResult> {
// Lock and chats limits enforcement are only required when waiting_queue is enabled
const shouldLock = settings.get<boolean>('Livechat_waiting_queue');

if (!shouldLock) {
return {
acquired: false,
required: false,
unlock: async () => {
// no-op
},
};
}

const lockTime = new Date();
const lockAcquired = await Users.acquireAgentLock(agentId, lockTime);

return {
acquired: !!lockAcquired,
required: true,
unlock: async () => {
await Users.releaseAgentLock(agentId, lockTime);
},
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
FieldGroup,
Select,
Accordion,
NumberInput,
} from '@rocket.chat/fuselage';
import type { TranslationKey } from '@rocket.chat/ui-contexts';
import DOMPurify from 'dompurify';
Expand Down Expand Up @@ -476,7 +477,9 @@ const OutgoingWebhookForm = () => {
<Controller
name='retryCount'
control={control}
render={({ field }) => <TextInput id={retryCountField} {...field} aria-describedby={`${retryCountField}-hint`} />}
render={({ field }) => (
<NumberInput {...field} id={retryCountField} min='0' step='1' aria-describedby={`${retryCountField}-hint`} />
)}
/>
</FieldRow>
<FieldHint id={`${retryCountField}-hint`}>{t('Integration_Retry_Count_Description')}</FieldHint>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { expect } from 'chai';
import proxyquire from 'proxyquire';
import sinon from 'sinon';

const mockUsers = {
acquireAgentLock: sinon.stub(),
releaseAgentLock: sinon.stub(),
};

const mockSettings = {
get: sinon.stub(),
};

const { conditionalLockAgent } = proxyquire.noCallThru().load('../../../../../../app/livechat/server/lib/conditionalLockAgent', {
'@rocket.chat/models': { Users: mockUsers },
'../../../settings/server': { settings: mockSettings },
});

describe('conditionalLockAgent', () => {
beforeEach(() => {
mockUsers.acquireAgentLock.reset();
mockUsers.releaseAgentLock.reset();
mockSettings.get.reset();
});

describe('when waiting_queue is enabled', () => {
beforeEach(() => {
mockSettings.get.withArgs('Livechat_waiting_queue').returns(true);
});

it('should return acquired: true when lock is successfully acquired', async () => {
mockUsers.acquireAgentLock.resolves(true);

const result = await conditionalLockAgent('agent1');

expect(result.acquired).to.equal(true);
expect(result.required).to.equal(true);
expect(mockUsers.acquireAgentLock.calledOnce).to.equal(true);
});

it('should return acquired: false when lock is already held by another process', async () => {
mockUsers.acquireAgentLock.resolves(false);

const result = await conditionalLockAgent('agent1');

expect(result.acquired).to.equal(false);
expect(result.required).to.equal(true);
expect(mockUsers.acquireAgentLock.calledOnce).to.equal(true);
});

it('should call releaseAgentLock when unlock is called', async () => {
mockUsers.acquireAgentLock.resolves(true);
mockUsers.releaseAgentLock.resolves(true);

const result = await conditionalLockAgent('agent1');
await result.unlock();

expect(mockUsers.releaseAgentLock.calledOnce).to.equal(true);
expect(mockUsers.releaseAgentLock.firstCall.args[0]).to.equal('agent1');
expect(mockUsers.releaseAgentLock.firstCall.args[1]).to.be.instanceOf(Date);
});
});

describe('when waiting_queue is disabled', () => {
beforeEach(() => {
mockSettings.get.withArgs('Livechat_waiting_queue').returns(false);
});

it('should return acquired: false and required: false without calling acquireAgentLock', async () => {
const result = await conditionalLockAgent('agent1');

expect(result.acquired).to.equal(false);
expect(result.required).to.equal(false);
expect(mockUsers.acquireAgentLock.called).to.equal(false);
});

it('should have a no-op unlock function', async () => {
const result = await conditionalLockAgent('agent1');
await result.unlock(); // should not throw an error

expect(mockUsers.releaseAgentLock.called).to.equal(false);
});
});
});
2 changes: 1 addition & 1 deletion packages/apps-engine/deno-runtime/deno.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"uuid": "npm:uuid@8.3.2"
},
"tasks": {
"test": "deno test --no-check --allow-read=../../../"
"test": "deno test --no-check --allow-read=../../../,/tmp --allow-write=/tmp"
},
"fmt": {
"lineWidth": 160,
Expand Down
12 changes: 7 additions & 5 deletions packages/apps-engine/deno-runtime/handlers/api-handler.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { Defined, JsonRpcError } from 'jsonrpc-lite';
import type { IApiEndpoint } from '@rocket.chat/apps-engine/definition/api/IApiEndpoint.ts';
import { Defined, JsonRpcError } from 'jsonrpc-lite';

import { AppObjectRegistry } from '../AppObjectRegistry.ts';
import { Logger } from '../lib/logger.ts';
import { AppAccessorsInstance } from '../lib/accessors/mod.ts';
import { RequestContext } from '../lib/requestContext.ts';

export default async function apiHandler(call: string, params: unknown): Promise<JsonRpcError | Defined> {
export default async function apiHandler(request: RequestContext): Promise<JsonRpcError | Defined> {
const { method: call, params } = request;
const [, path, httpMethod] = call.split(':');

const endpoint = AppObjectRegistry.get<IApiEndpoint>(`api:${path}`);
Expand All @@ -21,14 +23,14 @@ export default async function apiHandler(call: string, params: unknown): Promise
return new JsonRpcError(`${path}'s ${httpMethod} not exists`, -32000);
}

const [request, endpointInfo] = params as Array<unknown>;
const [requestData, endpointInfo] = params as Array<unknown>;

logger?.debug(`${path}'s ${call} is being executed...`, request);
logger?.debug(`${path}'s ${call} is being executed...`, requestData);

try {
// deno-lint-ignore ban-types
const result = await (method as Function).apply(endpoint, [
request,
requestData,
endpointInfo,
AppAccessorsInstance.getReader(),
AppAccessorsInstance.getModifier(),
Expand Down
11 changes: 8 additions & 3 deletions packages/apps-engine/deno-runtime/handlers/app/construct.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { Socket } from 'node:net';

import type { IParseAppPackageResult } from '@rocket.chat/apps-engine/server/compiler/IParseAppPackageResult.ts';

import { AppObjectRegistry } from '../../AppObjectRegistry.ts';
import { require } from '../../lib/require.ts';
import { sanitizeDeprecatedUsage } from '../../lib/sanitizeDeprecatedUsage.ts';
import { AppAccessorsInstance } from '../../lib/accessors/mod.ts';
import { Socket } from 'node:net';
import { RequestContext } from '../../lib/requestContext.ts';

const ALLOWED_NATIVE_MODULES = ['path', 'url', 'crypto', 'buffer', 'stream', 'net', 'http', 'https', 'zlib', 'util', 'punycode', 'os', 'querystring', 'fs'];
const ALLOWED_EXTERNAL_MODULES = ['uuid'];
Expand All @@ -13,7 +15,8 @@ function prepareEnvironment() {
// Deno does not behave equally to Node when it comes to piping content to a socket
// So we intervene here
const originalFinal = Socket.prototype._final;
Socket.prototype._final = function _final(cb) {
// deno-lint-ignore no-explicit-any
Socket.prototype._final = function _final(cb: any) {
// Deno closes the readable stream in the Socket earlier than Node
// The exact reason for that is yet unknown, so we'll need to simply delay the execution
// which allows data to be read in a response
Expand Down Expand Up @@ -71,7 +74,9 @@ function wrapAppCode(code: string): (require: (module: string) => unknown) => Pr
) as (require: (module: string) => unknown) => Promise<Record<string, unknown>>;
}

export default async function handleConstructApp(params: unknown): Promise<boolean> {
export default async function handleConstructApp(request: RequestContext): Promise<boolean> {
const { params } = request;

if (!Array.isArray(params)) {
throw new Error('Invalid params', { cause: 'invalid_param_type' });
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import type { App } from '@rocket.chat/apps-engine/definition/App.ts';

import { AppObjectRegistry } from '../../AppObjectRegistry.ts';
import { AppAccessorsInstance } from '../../lib/accessors/mod.ts';
import { RequestContext } from '../../lib/requestContext.ts';

export default async function handleInitialize(): Promise<boolean> {
export default async function handleInitialize(_request: RequestContext): Promise<boolean> {
const app = AppObjectRegistry.get<App>('app');

if (typeof app?.initialize !== 'function') {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import type { App } from '@rocket.chat/apps-engine/definition/App.ts';

import { AppObjectRegistry } from '../../AppObjectRegistry.ts';
import { AppAccessorsInstance } from '../../lib/accessors/mod.ts';
import { RequestContext } from '../../lib/requestContext.ts';

export default async function handleOnDisable(): Promise<boolean> {
export default async function handleOnDisable(_request: RequestContext): Promise<boolean> {
const app = AppObjectRegistry.get<App>('app');

if (typeof app?.onDisable !== 'function') {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import type { App } from '@rocket.chat/apps-engine/definition/App.ts';

import { AppObjectRegistry } from '../../AppObjectRegistry.ts';
import { AppAccessorsInstance } from '../../lib/accessors/mod.ts';
import { RequestContext } from '../../lib/requestContext.ts';

export default function handleOnEnable(): Promise<boolean> {
export default function handleOnEnable(_request: RequestContext): Promise<boolean> {
const app = AppObjectRegistry.get<App>('app');

if (typeof app?.onEnable !== 'function') {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ import type { App } from '@rocket.chat/apps-engine/definition/App.ts';

import { AppObjectRegistry } from '../../AppObjectRegistry.ts';
import { AppAccessorsInstance } from '../../lib/accessors/mod.ts';
import { RequestContext } from '../../lib/requestContext.ts';

export default async function handleOnInstall(params: unknown): Promise<boolean> {
export default async function handleOnInstall(request: RequestContext): Promise<boolean> {
const { params } = request;
const app = AppObjectRegistry.get<App>('app');

if (typeof app?.onInstall !== 'function') {
Expand Down
Loading
Loading