Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions chasm/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package chasm

import (
"context"
"errors"
"time"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
)

type Context interface {
Expand All @@ -27,6 +30,10 @@ type Context interface {
ExecutionCloseTime() time.Time
// Logger returns a logger tagged with execution key and other chasm framework internal information.
Logger() log.Logger
// NamespaceEntry returns the namespace entry for the execution.
NamespaceEntry() *namespace.Namespace
// EndpointByName resolves a nexus endpoint entry.
EndpointByName(endpointName string) (*persistencespb.NexusEndpointEntry, error)
// MetricsHandler returns a metrics handler with bare minimum tags (no namespace tag).
MetricsHandler() metrics.Handler
// Value returns the value associated with this context for key. The behavior is the same as context.Context.Value().
Expand All @@ -46,6 +53,10 @@ type Context interface {
goContext() context.Context
}

type EndpointRegistry interface {
GetByName(ctx context.Context, namespaceID namespace.ID, endpointName string) (*persistencespb.NexusEndpointEntry, error)
}

type MutableContext interface {
Context

Expand All @@ -54,9 +65,6 @@ type MutableContext interface {
// referencing the component.
AddTask(Component, TaskAttributes, any)

// Add more methods here for other storage commands/primitives.
// e.g. HistoryEvent

// Get a Ref for the component
// This ref to the component state at the end of the transition
// Same as Ref(Component) method in Context,
Expand Down Expand Up @@ -160,10 +168,22 @@ func (c *immutableCtx) structuredRef(component Component) (ComponentRef, error)
return c.root.structuredRef(component)
}

func (c *immutableCtx) NamespaceEntry() *namespace.Namespace {
return c.root.backend.GetNamespaceEntry()
}

func (c *immutableCtx) goContext() context.Context {
return c.ctx
}

func (c *immutableCtx) EndpointByName(name string) (*persistencespb.NexusEndpointEntry, error) {
reg := c.root.backend.EndpointRegistry()
if reg == nil {
return nil, errors.New("endpoint registry not available")
}
return reg.GetByName(c.ctx, c.NamespaceEntry().ID(), name)
}

// NewMutableContext creates a new MutableContext from an existing Context and root Node.
//
// NOTE: Library authors should not invoke this constructor directly, and instead use the [UpdateComponent],
Expand Down
26 changes: 24 additions & 2 deletions chasm/context_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package chasm

import (
"context"
"errors"
"slices"
"sync"
"time"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
)

var _ Context = (*MockContext)(nil)
Expand All @@ -21,10 +24,12 @@ type MockContext struct {
HandleRef func(component Component) ([]byte, error)
HandleExecutionCloseTime func() time.Time
HandleStateTransitionCount func() int64
HandleLibrary func(name string) (Library, bool)
HandleNamespaceEntry func() *namespace.Namespace
HandleEndpointByName func(string) (*persistencespb.NexusEndpointEntry, error)
HandleMetricsHandler func() metrics.Handler

ctx context.Context
HandleLibrary func(name string) (Library, bool)
ctx context.Context
}

func (c *MockContext) goContext() context.Context {
Expand All @@ -34,6 +39,13 @@ func (c *MockContext) goContext() context.Context {
return c.ctx
}

func (c *MockContext) EndpointByName(name string) (*persistencespb.NexusEndpointEntry, error) {
if c.HandleEndpointByName != nil {
return c.HandleEndpointByName(name)
}
return nil, errors.New("endpoint registry not available")
}

func (c *MockContext) Now(cmp Component) time.Time {
if c.HandleNow != nil {
return c.HandleNow(cmp)
Expand Down Expand Up @@ -73,6 +85,13 @@ func (c *MockContext) StateTransitionCount() int64 {
return 0
}

func (c *MockContext) NamespaceEntry() *namespace.Namespace {
if c.HandleNamespaceEntry != nil {
return c.HandleNamespaceEntry()
}
return nil
}

func (c *MockContext) Logger() log.Logger {
executionKey := c.ExecutionKey()
return log.NewTestLogger().With(
Expand Down Expand Up @@ -100,6 +119,9 @@ func (c *MockContext) withValue(key any, value any) Context {
HandleRef: c.HandleRef,
HandleExecutionCloseTime: c.HandleExecutionCloseTime,
HandleStateTransitionCount: c.HandleStateTransitionCount,
HandleLibrary: c.HandleLibrary,
HandleNamespaceEntry: c.HandleNamespaceEntry,
HandleEndpointByName: c.HandleEndpointByName,
HandleMetricsHandler: c.HandleMetricsHandler,
ctx: context.WithValue(c.goContext(), key, value),
}
Expand Down
42 changes: 0 additions & 42 deletions chasm/lib/nexusoperation/commands.go

This file was deleted.

1 change: 0 additions & 1 deletion chasm/lib/nexusoperation/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ var Module = fx.Module(
fx.Provide(NewCancellationBackoffTaskExecutor),
fx.Provide(newLibrary),
fx.Invoke(register),
fx.Invoke(registerCommandHandlers),
)

func register(
Expand Down
12 changes: 9 additions & 3 deletions chasm/lib/nexusoperation/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,27 @@ package nexusoperation

import (
"go.temporal.io/server/chasm"
"go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1"
nexusoperationpb "go.temporal.io/server/chasm/lib/nexusoperation/gen/nexusoperationpb/v1"
)

var _ chasm.Component = (*Operation)(nil)
var _ chasm.StateMachine[nexusoperationpb.OperationStatus] = (*Operation)(nil)

type OperationStore any

type Operation struct {
chasm.UnimplementedComponent

// Persisted internal state
*nexusoperationpb.OperationState

// Pointer to an implementation of the "store". For a workflow-based Nexus operation
// this is a parent pointer back to the workflow. For a standalone Nexus operation this is nil.
Store chasm.ParentPtr[OperationStore]
}

func NewOperation() *Operation {
return &Operation{}
func NewOperation(state *nexusoperationpb.OperationState) *Operation {
return &Operation{OperationState: state}
}

func (o *Operation) LifecycleState(_ chasm.Context) chasm.LifecycleState {
Expand Down
Loading
Loading