Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
"version": "1.0.0",
"description": "Integrate RocketChat into any React based web app using this extensible, robust component library",
"packageManager": "yarn@3.6.4",
"resolutions": {
"@rocket.chat/core-typings": "6.10.0",
"@rocket.chat/media-signaling": "https://registry.yarnpkg.com/@rocket.chat/ddp-client/-/ddp-client-0.3.45.tgz"
},
"workspaces": [
"packages/*",
"!packages/docs"
Expand Down
2 changes: 1 addition & 1 deletion packages/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@
},
"dependencies": {
"@embeddedchat/auth": "workspace:^",
"@rocket.chat/sdk": "^1.0.0-alpha.42"
"@rocket.chat/ddp-client": "0.3.45"
}
}
180 changes: 80 additions & 100 deletions packages/api/src/EmbeddedChatApi.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Rocketchat } from "@rocket.chat/sdk";
import { DDPClient } from "@rocket.chat/ddp-client";
import cloneArray from "./cloneArray";
import { ROCKETCHAT_APP_ID } from "./utils/constants";
import {
Expand All @@ -7,12 +7,13 @@ import {
ApiError,
} from "@embeddedchat/auth";

// mutliple typing status can come at the same time they should be processed in order.
// multiple typing status can come at the same time they should be processed in order.
let typingHandlerLock = 0;

export default class EmbeddedChatApi {
host: string;
rid: string;
rcClient: Rocketchat;
ddp: DDPClient;
onMessageCallbacks: ((message: any) => void)[];
onMessageDeleteCallbacks: ((messageId: string) => void)[];
onTypingStatusCallbacks: ((users: string[]) => void)[];
Expand All @@ -28,12 +29,13 @@ export default class EmbeddedChatApi {
) {
this.host = host;
this.rid = rid;
this.rcClient = new Rocketchat({
protocol: "ddp",
host: this.host,
useSsl: !/http:\/\//.test(host),
reopen: 20000,

const wsHost = host.replace(/^http/, "ws").replace(/\/$/, "") + "/websocket";

this.ddp = new DDPClient({
src: wsHost,
});

this.onMessageCallbacks = [];
this.onMessageDeleteCallbacks = [];
this.onTypingStatusCallbacks = [];
Expand All @@ -60,8 +62,7 @@ export default class EmbeddedChatApi {
return this.host;
}

/**
* Todo refactor
/** * Todo refactor
*/
async googleSSOLogin(signIn: Function, acsCode: string) {
const tokens = await signIn();
Expand Down Expand Up @@ -176,100 +177,90 @@ export default class EmbeddedChatApi {
async logout() {
try {
await this.auth.logout();
this.ddp.disconnect();
} catch (err) {
console.error(err);
}
}

/**
* All subscriptions are implemented here.
* TODO: Add logic to call thread message event listeners. To be done after thread implementation
*/
async connect() {
try {
await this.close(); // before connection, all previous subscriptions should be cancelled
await this.rcClient.connect({});
const token = (await this.auth.getCurrentUser())?.authToken;
await this.rcClient.resume({ token });
await this.rcClient.subscribeRoom(this.rid);
await this.rcClient.onMessage((data: any) => {
if (!data) {
return;
}
await this.close();
await this.ddp.connect();

const { authToken, userId } = (await this.auth.getCurrentUser()) || {};
if (authToken) {
await this.ddp.call("login", [{ resume: authToken }]);
}

await this.ddp.subscribe("stream-room-messages", [this.rid]);

this.ddp.on("stream-room-messages", (ddpMessage: any) => {
const data = ddpMessage?.fields?.args?.[0];
if (!data) return;

const message = JSON.parse(JSON.stringify(data));
if (message.ts?.$date) {
console.log(message.ts?.$date);
message.ts = message.ts.$date;
}
if (!message.ts) {
message.ts = new Date().toISOString();
}
this.onMessageCallbacks.map((callback) => callback(message));
this.onMessageCallbacks.forEach((callback) => callback(message));
});
await this.rcClient.subscribe(
"stream-notify-room",
`${this.rid}/user-activity`
);
await this.rcClient.onStreamData(
"stream-notify-room",
(ddpMessage: any) => {
const [roomId, event] = ddpMessage.fields.eventName.split("/");

if (roomId !== this.rid) {
return;
}
await this.ddp.subscribe("stream-notify-room", [
`${this.rid}/user-activity`,
]);

if (event === "user-activity") {
const typingUser = ddpMessage.fields.args[0];
const isTyping = ddpMessage.fields.args[1]?.includes("user-typing");
this.handleTypingEvent({ typingUser, isTyping });
}
this.ddp.on("stream-notify-room", (ddpMessage: any) => {
const eventName = ddpMessage?.fields?.eventName;
if (!eventName) return;

if (event === "typing") {
const typingUser = ddpMessage.fields.args[0];
const isTyping = ddpMessage.fields.args[1];
this.handleTypingEvent({ typingUser, isTyping });
}
if (event === "deleteMessage") {
const messageId = ddpMessage.fields.args[0]?._id;
this.onMessageDeleteCallbacks.map((callback) =>
callback(messageId)
);
}
const [roomId, event] = eventName.split("/");

if (roomId !== this.rid) {
return;
}
);
await this.rcClient.subscribeNotifyUser();
await this.rcClient.onStreamData(
"stream-notify-user",
(ddpMessage: any) => {
const [, event] = ddpMessage.fields.eventName.split("/");
const args: any[] = ddpMessage.fields.args
? Array.isArray(ddpMessage.fields.args)
? ddpMessage.fields.args
: [ddpMessage.fields.args]
: [];
if (event === "message") {
const data = args[0];
if (!data || data?.rid !== this.rid) {
return;
}
const message = JSON.parse(JSON.stringify(data));
if (message.ts?.$date) {
message.ts = message.ts.$date;
}
if (!message.ts) {
message.ts = new Date().toISOString();
}
message.renderType = "blocks";
this.onMessageCallbacks.map((callback) => callback(message));
} else if (event === "uiInteraction") {
this.onUiInteractionCallbacks.forEach((callback) =>
callback(args[0])
);
}

if (event === "user-activity") {
const args = ddpMessage.fields.args;
const typingUser = args[0];
const isTyping = args[1]?.includes("user-typing");
this.handleTypingEvent({ typingUser, isTyping });
}
);

if (event === "deleteMessage") {
const args = ddpMessage.fields.args;
const messageId = args[0]?._id;
this.onMessageDeleteCallbacks.forEach((callback) =>
callback(messageId)
);
}
});

if (userId) {
await this.ddp.subscribe("stream-notify-user", [`${userId}/message`]);
await this.ddp.subscribe("stream-notify-user", [
`${userId}/uiInteraction`,
]);
}

this.ddp.on("stream-notify-user", (ddpMessage: any) => {
const eventName = ddpMessage?.fields?.eventName;
if (!eventName) return;

const [, event] = eventName.split("/");
const args = ddpMessage.fields.args || [];

if (event === "uiInteraction") {
this.onUiInteractionCallbacks.forEach((callback) =>
callback(args[0])
);
}
});
} catch (err) {
console.error("DDP Connection Failed", err);
await this.close();
}
}
Expand Down Expand Up @@ -358,14 +349,12 @@ export default class EmbeddedChatApi {
typingUser: string;
isTyping: boolean;
}) {
// don't wait for more than 2 seconds. Though in practical, the waiting time is insignificant.
setTimeout(() => {
typingHandlerLock = 0;
}, 2000);
// eslint-disable-next-line no-empty
while (typingHandlerLock) {}
typingHandlerLock = 1;
// move user to front if typing else remove it.
const idx = this.typingUsers.indexOf(typingUser);
if (idx !== -1) {
this.typingUsers.splice(idx, 1);
Expand Down Expand Up @@ -538,17 +527,10 @@ export default class EmbeddedChatApi {
}

async close() {
await this.rcClient.unsubscribeAll();
await this.rcClient.disconnect();
this.ddp.unsubscribeAll();
this.ddp.disconnect();
}

/**
* @param {boolean} anonymousMode
* @param {Object} options This object should include query or fields.
* query - json object which accepts MongoDB query operators.
* fields - json object with properties that have either 1 or 0 to include them or exclude them
* @returns messages
*/
async getMessages(
anonymousMode = false,
options: {
Expand Down Expand Up @@ -726,20 +708,18 @@ export default class EmbeddedChatApi {

async sendTypingStatus(username: string, typing: boolean) {
try {
this.rcClient.methodCall(
"stream-notify-room",
await this.ddp.call("stream-notify-room", [
`${this.rid}/user-activity`,
username,
typing ? ["user-typing"] : []
);
typing ? ["user-typing"] : [],
]);
} catch (err) {
console.error(err);
}
}

/**
* @param {*} message should be a string or an rc message object
* Refer https://developer.rocket.chat/reference/api/schema-definition/message#message-object
*/
async sendMessage(message: any, threadId: string) {
const messageObj =
Expand Down Expand Up @@ -1261,4 +1241,4 @@ export default class EmbeddedChatApi {
const data = response.json();
return data;
}
}
}
Loading