Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions chasm/lib/nexusoperation/library.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,20 @@ func (l *Library) Components() []*chasm.RegistrableComponent {

func (l *Library) Tasks() []*chasm.RegistrableTask {
return []*chasm.RegistrableTask{
chasm.NewRegistrableSideEffectTask("invocation", l.operationInvocationTaskHandler),
chasm.NewRegistrableSideEffectTask(
"invocation",
l.operationInvocationTaskHandler,
chasm.WithSideEffectTaskGroup(TaskGroupName),
),
chasm.NewRegistrablePureTask("invocationBackoff", l.operationBackoffTaskHandler),
chasm.NewRegistrablePureTask("scheduleToStartTimeout", l.operationScheduleToStartTimeoutTaskHandler),
chasm.NewRegistrablePureTask("startToCloseTimeout", l.operationStartToCloseTimeoutTaskHandler),
chasm.NewRegistrablePureTask("scheduleToCloseTimeout", l.operationScheduleToCloseTimeoutTaskHandler),
chasm.NewRegistrableSideEffectTask("cancellation", l.cancellationInvocationTaskHandler),
chasm.NewRegistrableSideEffectTask(
"cancellation",
l.cancellationInvocationTaskHandler,
chasm.WithSideEffectTaskGroup(TaskGroupName),
),
chasm.NewRegistrablePureTask("cancellationBackoff", l.cancellationBackoffTaskHandler),
}
}
Expand Down
3 changes: 3 additions & 0 deletions chasm/lib/nexusoperation/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ type InvocationData struct {
NexusLink nexus.Link
}

// TaskGroupName groups invocation and cancellation together for the outbound queue
const TaskGroupName = "nexus"

// OperationStore defines the interface that must be implemented by any parent component that wants to manage Nexus operations.
// It's the responsibility of the parrent component to apply the appropriate state transitions to the operation.
type OperationStore interface {
Expand Down
21 changes: 21 additions & 0 deletions chasm/registrable_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type (
sideEffectTaskExecuteFn sideEffectTaskExecuteFn
sideEffectTaskDiscardFn sideEffectTaskDiscardFn
isPureTask bool
outboundTaskGroup string // For grouping on the outbound queue. See [WithSideEffectTaskGroup] for details.

// Those two fields are initialized when the component is registered to a library.
library namer
Expand Down Expand Up @@ -155,9 +156,19 @@ func (rt *RegistrableTask) registerToLibrary(

fqn := rt.fqType()
rt.taskTypeID = GenerateTypeID(fqn)
// If outboundTaskGroup wasn't set on creation default it here,
// since this is the first place we will have the fqn.
if rt.outboundTaskGroup == "" {
rt.outboundTaskGroup = fqn
}
return fqn, rt.taskTypeID, nil
}

// TaskGroup returns the side-effect task group for the task.
func (rt *RegistrableTask) TaskGroup() string {
return rt.outboundTaskGroup
}

// GoType returns the reflect.Type of the task's Go struct.
func (rt *RegistrableTask) GoType() reflect.Type {
return rt.goType
Expand All @@ -173,3 +184,13 @@ func (rt *RegistrableTask) fqType() string {
}
return FullyQualifiedName(rt.library.Name(), rt.taskType)
}

// WithSideEffectTaskGroup sets the task group for the task. The task group is used when
// the side effect's destination is specified for grouping semantics on the outbound queue,
// affects multi-cursor and the circuit breaker.
// If task group isn't provided, the task group will default to the fully qualified name at library registration.
func WithSideEffectTaskGroup(taskgroup string) RegistrableTaskOption {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know @yycptt said he'd rather use "side effect" for the naming here. I would prefer we use outbound because that's the only place where it's applicable. I would be okay with side-effect too or just WithTaskGroup but don't like the inconsistency between the name of the field and the name of this option.

As long as we're consistent, I'm fine with whatever.

return func(rt *RegistrableTask) {
rt.outboundTaskGroup = taskgroup
}
}
3 changes: 3 additions & 0 deletions chasm/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (s *RegistryTestSuite) TestRegistry_RegisterTasks_Success() {
chasm.NewRegistrableSideEffectTask(
"Task1",
chasm.NewMockSideEffectTaskHandler[*chasm.MockComponent, testTask1](ctrl),
chasm.WithSideEffectTaskGroup("test-task-group"),
),
chasm.NewRegistrablePureTask(
"Task2",
Expand All @@ -133,6 +134,7 @@ func (s *RegistryTestSuite) TestRegistry_RegisterTasks_Success() {
rt1, ok := r.Task("TestLibrary.Task1")
require.True(s.T(), ok)
require.Equal(s.T(), "TestLibrary.Task1", rt1.FqType())
s.Require().Equal("test-task-group", rt1.TaskGroup())

missingRT, ok := r.Task("TestLibrary.TaskMissing")
require.False(s.T(), ok)
Expand All @@ -142,6 +144,7 @@ func (s *RegistryTestSuite) TestRegistry_RegisterTasks_Success() {
rt2, ok := r.TaskFor(tInstance1)
require.True(s.T(), ok)
require.Equal(s.T(), "TestLibrary.Task2", rt2.FqType())
s.Require().Equal(rt2.FqType(), rt2.TaskGroup())

rt2, ok = r.TaskOf(reflect.TypeOf(tInstance1))
require.True(s.T(), ok)
Expand Down
17 changes: 17 additions & 0 deletions service/history/outbound_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package history
import (
"fmt"

"go.temporal.io/server/chasm"
"go.temporal.io/server/client"
"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/dynamicconfig"
Expand Down Expand Up @@ -307,6 +308,7 @@ func (f *outboundQueueFactory) CreateQueue(
logger,
metricsHandler,
factory,
outboundTaskGroupPostProcessor(f.ChasmRegistry),
)
}

Expand Down Expand Up @@ -359,3 +361,18 @@ func getNamespaceNameOrDefault(
}
return nsName.String()
}

func outboundTaskGroupPostProcessor(registry *chasm.Registry) func([]tasks.Task) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with this being here as long as you can guarantee that the task group is populated on any chasm task before the property is read anywhere. Can you confirm that Test for the predicate works?

if registry == nil {
return nil
}
return func(taskSlice []tasks.Task) {
for _, t := range taskSlice {
if ct, ok := t.(*tasks.ChasmTask); ok {
if rt, ok := registry.TaskByID(ct.Info.GetTypeId()); ok {
ct.SetOutboundTaskGroup(rt.TaskGroup())
}
}
}
}
}
8 changes: 4 additions & 4 deletions service/history/queues/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,10 +744,10 @@ func (e *executableImpl) GetDestination() string {
return ""
}

// StateMachineTaskType returns the embedded task's state machine task type if it exists. Defaults to 0.
func (e *executableImpl) StateMachineTaskType() string {
if t, ok := e.Task.(tasks.HasStateMachineTaskType); ok {
return t.StateMachineTaskType()
// OutboundTaskGroup returns the embedded task's outbound task group. Defaults to an empty string.
func (e *executableImpl) OutboundTaskGroup() string {
if t, ok := e.Task.(tasks.HasOutboundTaskGroup); ok {
return t.OutboundTaskGroup()
}
return ""
}
Expand Down
4 changes: 2 additions & 2 deletions service/history/queues/grouper.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ func (GrouperStateMachineNamespaceIDAndDestination) KeyTyped(task tasks.Task) (k
if ok {
dest = destGetter.GetDestination()
}
smtGetter, ok := task.(tasks.HasStateMachineTaskType)
smtGetter, ok := task.(tasks.HasOutboundTaskGroup)
var smt string
if ok {
smt = smtGetter.StateMachineTaskType()
smt = smtGetter.OutboundTaskGroup()
}
return tasks.TaskGroupNamespaceIDAndDestination{
TaskGroup: smt,
Expand Down
2 changes: 1 addition & 1 deletion service/history/queues/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func GetOutboundTaskTypeTagValue(

switch task := task.(type) {
case *tasks.StateMachineOutboundTask:
return prefix + "." + task.StateMachineTaskType()
return prefix + "." + task.OutboundTaskGroup()
case *tasks.ChasmTask:
return prefix + "." + getCHASMTaskTypeTagValue(task, chasmRegistry)
case *tasks.WorkerCommandsTask:
Expand Down
7 changes: 7 additions & 0 deletions service/history/queues/queue_immediate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type (

notifyCh chan struct{}
}

taskPostProcessorFn func([]tasks.Task)
)

func NewImmediateQueue(
Expand All @@ -37,6 +39,7 @@ func NewImmediateQueue(
logger log.Logger,
metricsHandler metrics.Handler,
factory ExecutableFactory,
taskPostProcessor taskPostProcessorFn,
) *immediateQueue {
paginationFnProvider := func(r Range) collection.PaginationFn[tasks.Task] {
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
Expand All @@ -57,6 +60,10 @@ func NewImmediateQueue(
return nil, nil, err
}

if taskPostProcessor != nil {
taskPostProcessor(resp.Tasks)
}

return resp.Tasks, resp.NextPageToken, nil
}
}
Expand Down
1 change: 1 addition & 0 deletions service/history/queues/queue_immediate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (s *immediateQueueSuite) SetupTest() {
log.NewTestLogger(),
metrics.NoopMetricsHandler,
nil, // execuable factory
nil, // taskPostProcessor
)
}

Expand Down
18 changes: 17 additions & 1 deletion service/history/tasks/chasm_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,14 @@ type ChasmTask struct {
Info *persistencespb.ChasmTaskInfo

// In-memory only
DeserializedTask reflect.Value
outboundTaskGroup string // set to the registered task's taskgroup after deserialization for outbound tasks
DeserializedTask reflect.Value
}

var _ Task = &ChasmTask{}
var _ HasArchetypeID = &ChasmTask{}
var _ HasOutboundTaskGroup = &ChasmTask{}
var _ HasDestination = &ChasmTask{}

func (t *ChasmTask) GetCategory() Category {
return t.Category
Expand Down Expand Up @@ -105,6 +108,19 @@ func (t *ChasmTask) GetVisibilityTime() time.Time {
func (t *ChasmTask) SetVisibilityTime(timestamp time.Time) {
t.VisibilityTimestamp = timestamp
}

func (t *ChasmTask) GetArchetypeID() uint32 {
return t.Info.GetArchetypeId()
}

func (t *ChasmTask) GetDestination() string {
return t.Destination
}

func (t *ChasmTask) OutboundTaskGroup() string {
return t.outboundTaskGroup
}

func (t *ChasmTask) SetOutboundTaskGroup(taskgroup string) {
t.outboundTaskGroup = taskgroup
}
8 changes: 4 additions & 4 deletions service/history/tasks/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ func NewOutboundTaskGroupPredicate(
}

func (n *OutboundTaskGroupPredicate) Test(task Task) bool {
smTask, ok := task.(HasStateMachineTaskType)
smTask, ok := task.(HasOutboundTaskGroup)
if !ok {
return false
}
_, ok = n.Groups[smTask.StateMachineTaskType()]
_, ok = n.Groups[smTask.OutboundTaskGroup()]
return ok
}

Expand Down Expand Up @@ -204,8 +204,8 @@ func (t *OutboundTaskPredicate) Test(task Task) bool {
group := TaskGroupNamespaceIDAndDestination{
NamespaceID: task.GetNamespaceID(),
}
if smTask, ok := task.(HasStateMachineTaskType); ok {
group.TaskGroup = smTask.StateMachineTaskType()
if smTask, ok := task.(HasOutboundTaskGroup); ok {
group.TaskGroup = smTask.OutboundTaskGroup()
}
if dTask, ok := task.(HasDestination); ok {
group.Destination = dTask.GetDestination()
Expand Down
4 changes: 2 additions & 2 deletions service/history/tasks/state_machine_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type StateMachineTask struct {
Info *persistencespb.StateMachineTaskInfo
}

var _ HasStateMachineTaskType = &StateMachineTask{}
var _ HasOutboundTaskGroup = &StateMachineTask{}

func (t *StateMachineTask) GetTaskID() int64 {
return t.TaskID
Expand All @@ -34,7 +34,7 @@ func (t *StateMachineTask) SetVisibilityTime(timestamp time.Time) {
t.VisibilityTimestamp = timestamp
}

func (t *StateMachineTask) StateMachineTaskType() string {
func (t *StateMachineTask) OutboundTaskGroup() string {
return t.Info.Type
}

Expand Down
7 changes: 4 additions & 3 deletions service/history/tasks/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ type (
GetVersion() int64
}

// HasStateMachineTaskType must be implemented by all HSM state machine tasks.
HasStateMachineTaskType interface {
StateMachineTaskType() string
// HasOutboundTaskGroup must be implemented by all tasks that participate in outbound task grouping.
HasOutboundTaskGroup interface {
OutboundTaskGroup() string
}

// HasDestination must be implemented by all tasks used in the outbound queue.
HasDestination interface {
GetDestination() string
Expand Down
34 changes: 17 additions & 17 deletions service/history/tasks/task_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions service/history/transfer_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,5 +202,6 @@ func (f *transferQueueFactory) CreateQueue(
logger,
metricsHandler,
factory,
nil, // taskPostProcessor
)
}
1 change: 1 addition & 0 deletions service/history/visibility_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,5 +168,6 @@ func (f *visibilityQueueFactory) CreateQueue(
logger,
metricsHandler,
factory,
nil, // taskPostProcessor
)
}
Loading