Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions packages/components/nodes/tools/MCP/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@ export class MCPToolkit extends BaseToolkit {
client: Client | null = null
serverParams: StdioServerParameters | any
transportType: 'stdio' | 'sse'
/** Per-invocation HTTP headers injected at tools/call time; overrides static toolkit headers for the same names. */
getToolCallHeaders?: () => Promise<Record<string, string>>
constructor(serverParams: StdioServerParameters | any, transportType: 'stdio' | 'sse') {
super()
this.serverParams = serverParams
this.transportType = transportType
}

// Method to create a new client with transport
async createClient(): Promise<Client> {
/**
* Creates a new MCP client and connects it via the configured transport.
* @param injectHeaders - Additional HTTP headers merged over static `serverParams.headers` for this connection. Used to pass per-invocation headers (e.g. from {@link getToolCallHeaders}) into SSE/HTTP transports.
*/
async createClient(injectHeaders: Record<string, string> = {}): Promise<Client> {
const client = new Client(
{
name: 'flowise-client',
Expand Down Expand Up @@ -54,28 +59,30 @@ export class MCPToolkit extends BaseToolkit {

const baseUrl = new URL(this.serverParams.url)
await checkDenyList(this.serverParams.url)
const mergedHeaders = { ...this.serverParams?.headers, ...injectHeaders }
const headers = Object.keys(mergedHeaders).length > 0 ? mergedHeaders : undefined
try {
if (this.serverParams.headers) {
if (headers) {
transport = new StreamableHTTPClientTransport(baseUrl, {
requestInit: {
headers: this.serverParams.headers
headers
}
})
} else {
transport = new StreamableHTTPClientTransport(baseUrl)
}
await client.connect(transport)
} catch (error) {
if (this.serverParams.headers) {
if (headers) {
transport = new SSEClientTransport(baseUrl, {
requestInit: {
headers: this.serverParams.headers
headers
},
eventSourceInit: {
fetch: async (url, init) => {
return secureFetch(url.toString(), {
...(init as any),
headers: this.serverParams.headers
headers
}) as any
}
}
Expand Down Expand Up @@ -148,7 +155,8 @@ export async function MCPTool({
return tool(
async (input): Promise<string> => {
// Create a new client for this request
const client = await toolkit.createClient()
const toolCallHeaders = await toolkit.getToolCallHeaders?.()
const client = await toolkit.createClient(toolCallHeaders)

try {
const req: CallToolRequest = { method: 'tools/call', params: { name: name, arguments: input as any } }
Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/controllers/chatflows/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ const getChatflowByApiKey = async (req: Request, res: Response, next: NextFuncti
if (!apikey) {
return res.status(401).send('Unauthorized')
}
const apiResponse = await chatflowsService.getChatflowByApiKey(apikey.id, req.query.keyonly)
const apiResponse = await chatflowsService.getChatflowByApiKey(apikey.id, apikey.workspaceId, req.query.keyonly)
return res.json(apiResponse)
} catch (error) {
next(error)
Expand Down
17 changes: 11 additions & 6 deletions packages/server/src/services/chatflows/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ICommonObject, removeFolderFromStorage } from 'flowise-components'
import { StatusCodes } from 'http-status-codes'
import { In } from 'typeorm'
import { Brackets, In } from 'typeorm'
import { validate as isValidUUID } from 'uuid'
import { ChatflowType, IReactFlowObject } from '../../Interface'
import { FLOWISE_COUNTER_STATUS, FLOWISE_METRIC_COUNTERS } from '../../Interface.Metrics'
Expand Down Expand Up @@ -220,16 +220,21 @@ const getAllChatflowsCount = async (type?: ChatflowType, workspaceId?: string):
}
}

const getChatflowByApiKey = async (apiKeyId: string, keyonly?: unknown): Promise<any> => {
const getChatflowByApiKey = async (apiKeyId: string, workspaceId: string, keyonly?: unknown): Promise<any> => {
try {
// Here we only get chatflows that are bounded by the apikeyid and chatflows that are not bounded by any apikey
const appServer = getRunningExpressApp()
let query = appServer.AppDataSource.getRepository(ChatFlow)
.createQueryBuilder('cf')
.where('cf.apikeyid = :apikeyid', { apikeyid: apiKeyId })
if (keyonly === undefined) {
query = query.orWhere('cf.apikeyid IS NULL').orWhere('cf.apikeyid = ""')
}
.where('cf.workspaceId = :workspaceId', { workspaceId })
.andWhere(
new Brackets((qb) => {
qb.where('cf.apikeyid = :apikeyid', { apikeyid: apiKeyId })
if (keyonly === undefined) {
qb.orWhere('cf.apikeyid IS NULL').orWhere('cf.apikeyid = ""')
}
})
)

const dbResponse = await query.orderBy('cf.name', 'ASC').getMany()
if (dbResponse.length < 1) {
Expand Down