Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions core/capabilities/mocks/executable_and_trigger_capability.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 48 additions & 0 deletions core/capabilities/mocks/trigger_capability.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 48 additions & 0 deletions core/capabilities/mocks/trigger_executable.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions core/capabilities/remote/combined_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@ func (c *combinedClient) UnregisterTrigger(ctx context.Context, request capabili
return subscriber.UnregisterTrigger(ctx, request)
}

func (c *combinedClient) AckEvent(ctx context.Context, triggerID string, eventID string) error {
c.mu.RLock()
defer c.mu.RUnlock()
for _, trigger := range c.triggerSubscribers {
info, err := trigger.Info(ctx)
if err != nil {
return err
}
if info.ID == triggerID {
return trigger.AckEvent(ctx, triggerID, eventID)
}
}
return fmt.Errorf("could not find trigger %q triggerID", triggerID)
}

func (c *combinedClient) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
return errors.New("RegisterToWorkflow is not supported by remote capabilities")
}
Expand Down
47 changes: 46 additions & 1 deletion core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ type triggerPublisher struct {

messageCache *messagecache.MessageCache[registrationKey, p2ptypes.PeerID]
registrations map[registrationKey]*pubRegState
mu sync.RWMutex // protects messageCache and registrations
ackCache *messagecache.MessageCache[ackKey, p2ptypes.PeerID]
mu sync.RWMutex // protects messageCache, ackCache, and registrations

batchingQueue map[[32]byte]*batchedResponse
bqMu sync.Mutex // protects batchingQueue
stopCh services.StopChan
Expand All @@ -57,6 +59,11 @@ type registrationKey struct {
workflowID string
}

type ackKey struct {
callerDonID uint32
triggerEventID string
}

type pubRegState struct {
callback <-chan commoncap.TriggerResponse
request commoncap.TriggerRegistrationRequest
Expand Down Expand Up @@ -86,6 +93,7 @@ func NewTriggerPublisher(capabilityID string, capMethodName string, dispatcher t
capMethodName: capMethodName,
dispatcher: dispatcher,
messageCache: messagecache.NewMessageCache[registrationKey, p2ptypes.PeerID](),
ackCache: messagecache.NewMessageCache[ackKey, p2ptypes.PeerID](),
registrations: make(map[registrationKey]*pubRegState),
batchingQueue: make(map[[32]byte]*batchedResponse),
stopCh: make(services.StopChan),
Expand Down Expand Up @@ -245,6 +253,43 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) {
case types.MethodTriggerEvent:
p.lggr.Errorw("trigger request failed with error",
"method", SanitizeLogString(msg.Method), "sender", sender, "errorMsg", SanitizeLogString(msg.ErrorMsg))
case types.MethodTriggerEventAck:
triggerMetadata := msg.GetTriggerEventMetadata()
if triggerMetadata == nil {
p.lggr.Errorw("recieved empty trigger event ack metadata", "sender", sender)
break
}
triggerEventID := triggerMetadata.TriggerEventId
p.lggr.Debugw("received trigger event ACK", "sender", sender, "trigger event ID", triggerEventID)

p.mu.Lock()
defer p.mu.Unlock()
callerDon, ok := cfg.workflowDONs[msg.CallerDonId]
if !ok {
p.lggr.Errorw("received a message from unsupported workflow DON", "callerDonId", msg.CallerDonId)
return
}
if !cfg.membersCache[msg.CallerDonId][sender] {
p.lggr.Errorw("sender not a member of its workflow DON", "callerDonId", msg.CallerDonId, "sender", sender)
return
}

key := ackKey{msg.CallerDonId, triggerEventID}
nowMs := time.Now().UnixMilli()
p.ackCache.Insert(key, sender, nowMs, msg.Payload) // TODO: Payload is empty..
minRequired := uint32(2*callerDon.F + 1)
ready, _ := p.ackCache.Ready(key, minRequired, nowMs-cfg.remoteConfig.EventTimeout.Milliseconds(), false)
if !ready {
p.lggr.Debugw("not ready to ACK trigger event yet", "triggerEventId", triggerEventID, "minRequired", minRequired)
return
}

ctx, cancel := p.stopCh.NewCtx()
defer cancel()
err = cfg.underlying.AckEvent(ctx, p.capabilityID, triggerEventID)
if err != nil {
p.lggr.Errorw("failed to AckEvent on underlying trigger capability", "err", err)
}
default:
p.lggr.Errorw("received message with unknown method",
"method", SanitizeLogString(msg.Method), "sender", sender)
Expand Down
33 changes: 33 additions & 0 deletions core/capabilities/remote/trigger_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,20 @@ func TestTriggerPublisher_ReceiveTriggerEvents_BatchingEnabled(t *testing.T) {
require.NoError(t, publisher.Close())
}

func TestTriggerPublisher_RecieveTriggerEventAcks(t *testing.T) {
ctx := testutils.Context(t)
capabilityDONID, workflowDONID := uint32(1), uint32(2)
underlyingTriggerCap, publisher, _, peers := newServices(t, capabilityDONID, workflowDONID, 2)
eventId := "123"
regEvent := newAckEventMessage(t, eventId, workflowDONID, peers[1])
publisher.Receive(ctx, regEvent)

require.True(t, underlyingTriggerCap.eventAckd)
require.NoError(t, publisher.Close())

// TODO: Increase event ACK test coverage
}

func TestTriggerPublisher_SetConfig_Basic(t *testing.T) {
t.Parallel()
lggr := logger.Test(t)
Expand Down Expand Up @@ -266,10 +280,24 @@ func newRegisterTriggerMessage(t *testing.T, callerDonID uint32, sender p2ptypes
}
}

func newAckEventMessage(t *testing.T, eventId string, callerDonID uint32, sender p2ptypes.PeerID) *remotetypes.MessageBody {
return &remotetypes.MessageBody{
Sender: sender[:],
Method: remotetypes.MethodTriggerEventAck,
CallerDonId: callerDonID,
Metadata: &remotetypes.MessageBody_TriggerEventMetadata{
TriggerEventMetadata: &remotetypes.TriggerEventMetadata{
TriggerEventId: eventId,
},
},
}
}

type testTrigger struct {
info commoncap.CapabilityInfo
registrationsCh chan commoncap.TriggerRegistrationRequest
eventCh chan commoncap.TriggerResponse
eventAckd bool
}

func (tr *testTrigger) Info(_ context.Context) (commoncap.CapabilityInfo, error) {
Expand All @@ -284,3 +312,8 @@ func (tr *testTrigger) RegisterTrigger(_ context.Context, request commoncap.Trig
func (tr *testTrigger) UnregisterTrigger(_ context.Context, request commoncap.TriggerRegistrationRequest) error {
return nil
}

func (tr *testTrigger) AckEvent(_ context.Context, triggerId string, eventId string) error {
tr.eventAckd = true
return nil
}
29 changes: 29 additions & 0 deletions core/capabilities/remote/trigger_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,35 @@ func (s *triggerSubscriber) Info(ctx context.Context) (commoncap.CapabilityInfo,
return cfg.capInfo, nil
}

func (s *triggerSubscriber) AckEvent(ctx context.Context, triggerId, eventId string) error {
if s.capabilityID != triggerId {
return fmt.Errorf("AckEvent invariant violation: triggerId=%q was dispatched to the wrong capability (capabilityID=%q)", triggerId, s.capabilityID)
}

s.mu.RLock()
cfg := s.cfg.Load()
for _, peerID := range cfg.capDonInfo.Members {
m := &types.MessageBody{
CapabilityId: cfg.capInfo.ID,
CapabilityDonId: cfg.capDonInfo.ID,
CallerDonId: cfg.localDonID,
Method: types.MethodTriggerEventAck,
CapabilityMethod: s.capMethodName,
Metadata: &types.MessageBody_TriggerEventMetadata{
TriggerEventMetadata: &types.TriggerEventMetadata{
TriggerEventId: eventId,
},
},
}
err := s.dispatcher.Send(peerID, m)
if err != nil {
s.lggr.Errorw("failed to send message", "donId", cfg.capDonInfo.ID, "peerId", peerID, "err", err)
}
}
s.mu.RUnlock()
return nil
}

func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, request commoncap.TriggerRegistrationRequest) (<-chan commoncap.TriggerResponse, error) {
rawRequest, err := pb.MarshalTriggerRegistrationRequest(request)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/remote/types/messages.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/capabilities/remote/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
MethodUnRegisterTrigger = "UnregisterTrigger"
MethodTriggerEvent = "TriggerEvent"
MethodExecute = "Execute"
MethodTriggerEventAck = "TriggerEventACK"
)

type Dispatcher interface {
Expand Down
4 changes: 4 additions & 0 deletions core/capabilities/webapi/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ func (h *triggerConnectorHandler) UnregisterTrigger(ctx context.Context, req cap
return nil
}

func (h *triggerConnectorHandler) AckEvent(ctx context.Context, triggerID, eventID string) error {
return nil
}

func (h *triggerConnectorHandler) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
return h.CapabilityInfo, nil
}
Expand Down
Loading
Loading