Skip to content

Commit f4ecf1a

Browse files
💾 Feat(Workflow, PluginServer): To ensure the workflow and the plugin server can receive the response of the plugin(functions).
1 parent 51679b0 commit f4ecf1a

5 files changed

Lines changed: 113 additions & 10 deletions

File tree

KitX Clients/KitX Core/KitX.Core/Device/PluginsServer.cs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,17 @@ public class PluginsServer : IPluginServer
8585
/// </summary>
8686
public event EventHandler<PluginUnregisteredEventArgs>? PluginUnregistered;
8787

88+
/// <summary>
89+
/// Event raised when a plugin sends a response (has RequestId)
90+
/// </summary>
91+
public event EventHandler<PluginResponseEventArgs>? PluginResponse;
92+
8893
/// <summary>
8994
/// Private constructor
9095
/// </summary>
91-
private PluginsServer() { }
96+
private PluginsServer()
97+
{
98+
}
9299

93100
/// <summary>
94101
/// Initializes the server
@@ -116,6 +123,17 @@ public IPluginServer Run()
116123

117124
_status = ServerStatus.Starting;
118125

126+
// Initialize RealPluginManager when server starts, so it can receive plugin messages
127+
try
128+
{
129+
_ = new KitX.Core.Workflow.RealPluginManager(this);
130+
Log.Information("[PluginsServer] RealPluginManager initialized for message handling");
131+
}
132+
catch (Exception ex)
133+
{
134+
Log.Warning(ex, "[PluginsServer] Failed to initialize RealPluginManager");
135+
}
136+
119137
const int maxRetries = 5;
120138
const int startPort = 7777;
121139
int currentPort = startPort;
@@ -224,13 +242,21 @@ public IPluginServer Run()
224242
Log.Warning(ex, "Error handling plugin message");
225243
}
226244

245+
Log.Information($"[PluginsServer] Invoking PluginMessageReceived event for connection {connectionId}");
227246
PluginMessageReceived?.Invoke(this, new PluginMessageReceivedEventArgs
228247
{
229248
ConnectionId = connectionId,
230249
Message = message
231250
});
232251
};
233252

253+
// Forward PluginResponse events from PluginConnection to PluginsServer.PluginResponse
254+
connection.PluginResponse += (sender, args) =>
255+
{
256+
Log.Information($"[PluginsServer] Forwarding PluginResponse event, RequestId: {args.RequestId}");
257+
PluginResponse?.Invoke(this, args);
258+
};
259+
234260
connection.Initialize();
235261

236262
PluginConnected?.Invoke(this, new PluginConnectedEventArgs

KitX Clients/KitX Core/KitX.Core/Workflow/RealPluginManager.cs

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Threading.Tasks;
77
using Kscript.CSharp.Parser.Core;
88
using Kscript.CSharp.Parser.Models;
9+
using KitX.Core.Contract.Plugin;
910
using KitX.Core.Device;
1011
using KitX.Shared.CSharp.Plugin;
1112
using KitX.Shared.CSharp.WebCommand;
@@ -49,6 +50,38 @@ public RealPluginManager(PluginsServer pluginsServer)
4950

5051
// 订阅插件消息接收事件以处理响应
5152
_pluginsServer.PluginMessageReceived += OnPluginMessageReceived;
53+
54+
// 订阅插件响应事件(当插件返回带RequestId的响应时触发)
55+
_pluginsServer.PluginResponse += OnPluginResponse;
56+
}
57+
58+
/// <summary>
59+
/// 处理插件响应事件
60+
/// </summary>
61+
private void OnPluginResponse(object? sender, PluginResponseEventArgs e)
62+
{
63+
try
64+
{
65+
Log.Information($"[RealPluginManager] OnPluginResponse called with RequestId: {e.RequestId}");
66+
67+
if (_pendingResponses.TryRemove(e.RequestId, out var tcs))
68+
{
69+
var command = JsonSerializer.Deserialize<Command>(e.Content, _serializerOptions);
70+
var responseBody = command.BodyLength > 0
71+
? Encoding.UTF8.GetString(command.Body.AsSpan(0, command.BodyLength))
72+
: string.Empty;
73+
Log.Information($"[RealPluginManager] Setting result from PluginResponse: {responseBody}");
74+
tcs.SetResult(responseBody);
75+
}
76+
else
77+
{
78+
Log.Warning($"[RealPluginManager] RequestId {e.RequestId} not found in pending responses");
79+
}
80+
}
81+
catch (Exception ex)
82+
{
83+
Log.Warning(ex, "[RealPluginManager] Error handling plugin response");
84+
}
5285
}
5386

5487
/// <summary>
@@ -58,22 +91,36 @@ private void OnPluginMessageReceived(object? sender, PluginMessageReceivedEventA
5891
{
5992
try
6093
{
94+
Log.Information($"[RealPluginManager] OnPluginMessageReceived called with message: {e.Message?.Substring(0, Math.Min(200, e.Message?.Length ?? 0))}...");
95+
6196
var kwc = JsonSerializer.Deserialize<Request>(e.Message, _serializerOptions);
62-
if (kwc?.Content is null) return;
97+
if (kwc?.Content is null)
98+
{
99+
Log.Information($"[RealPluginManager] kwc or kwc.Content is null");
100+
return;
101+
}
102+
103+
Log.Information($"[RealPluginManager] kwc.Content: {kwc.Content.Substring(0, Math.Min(100, kwc.Content.Length))}...");
63104

64105
var command = JsonSerializer.Deserialize<Command>(kwc.Content, _serializerOptions);
65106
if (command.Request is null) return; // Command is a struct, check if Request is empty
66107

67108
// 检查是否是响应消息
68109
if (command.Tags != null && command.Tags.TryGetValue("RequestId", out var requestId))
69110
{
111+
Log.Information($"[RealPluginManager] Found RequestId: {requestId}");
70112
if (_pendingResponses.TryRemove(requestId, out var tcs))
71113
{
72114
var responseBody = command.BodyLength > 0
73115
? Encoding.UTF8.GetString(command.Body.AsSpan(0, command.BodyLength))
74116
: string.Empty;
117+
Log.Information($"[RealPluginManager] Setting result: {responseBody}");
75118
tcs.SetResult(responseBody);
76119
}
120+
else
121+
{
122+
Log.Warning($"[RealPluginManager] RequestId {requestId} not found in pending responses");
123+
}
77124
}
78125
}
79126
catch (Exception ex)
@@ -178,10 +225,24 @@ private async Task<string> CallAsync(PluginCallInfo callInfo)
178225

179226
Log.Information($"[RealPluginManager] Sent request to {callInfo.PluginName}.{callInfo.MethodName}, RequestId: {requestId}");
180227

181-
// 注意: Loader 在处理 ReceiveCommand 后不会返回响应
182-
// 对于 void 方法,我们直接返回空结果
183-
// 对于有返回值的方法,需要插件支持返回响应(当前不支持)
184-
return string.Empty;
228+
// 等待插件响应,设置超时
229+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
230+
try
231+
{
232+
return await tcs.Task.WaitAsync(cts.Token);
233+
}
234+
catch (TimeoutException)
235+
{
236+
Log.Warning($"[RealPluginManager] Request {requestId} timed out");
237+
_pendingResponses.TryRemove(requestId, out _);
238+
throw new TimeoutException($"Plugin call timed out: {callInfo.PluginName}.{callInfo.MethodName}");
239+
}
240+
catch (OperationCanceledException)
241+
{
242+
Log.Warning($"[RealPluginManager] Request {requestId} was cancelled");
243+
_pendingResponses.TryRemove(requestId, out _);
244+
throw;
245+
}
185246
}
186247
catch (Exception ex)
187248
{

KitX Clients/KitX Core/KitX.Core/Workflow/WorkflowScriptService.cs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,25 @@ public class WorkflowScriptService : IWorkflowService
4949
private List<PluginInfo> _availablePlugins { get; set; } = new();
5050

5151
/// <summary>
52-
/// Private constructor
52+
/// Private constructor - initializes RealPluginManager immediately
5353
/// </summary>
54-
private WorkflowScriptService() { }
54+
private WorkflowScriptService()
55+
{
56+
// Pre-initialize RealPluginManager to ensure it subscribes to plugin events
57+
// This must be done at startup, not when first script is executed
58+
try
59+
{
60+
var pluginsServer = PluginsServer.Instance;
61+
var realPluginManager = new RealPluginManager(pluginsServer);
62+
Parser.SetPluginManager(realPluginManager);
63+
_isParserInitialized = true;
64+
Log.Information("[WorkflowScriptService] Real plugin manager pre-initialized at startup");
65+
}
66+
catch (Exception ex)
67+
{
68+
Log.Warning(ex, "[WorkflowScriptService] Failed to pre-initialize RealPluginManager, will retry on first script execution");
69+
}
70+
}
5571

5672
/// <summary>
5773
/// Gets the script engine, creating it if necessary

KitX SDK

0 commit comments

Comments
 (0)