Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions .github/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ linters:
msg: "Please avoid using panic in application code"
- pattern: time\.Now
msg: "Using time.Now is not allowed in chasm/lib package (non-test files), use ctx.Now(component) instead"
- pattern: time\.NewTimer
msg: "Do not use time.NewTimer directly; use TimeSource.NewTimer() for testability with fake time."
- pattern: time\.AfterFunc
msg: "Do not use time.AfterFunc directly; use TimeSource.AfterFunc() for testability with fake time."
- pattern: time\.After\(
msg: "Do not use time.After directly; use TimeSource.NewTimer() for testability with fake time."
- pattern: "^Unix$"
msg: "Do not use .Unix() for Cassandra timestamps (returns seconds). Use p.UnixMilliseconds() which returns milliseconds."
- pattern: "^UnixMilli$"
Expand Down Expand Up @@ -165,11 +171,11 @@ linters:
text: "time.Sleep"
linters:
- forbidigo
- path-except: chasm/lib/.*\.go$
- path-except: chasm/lib/.*\.go$|service/matching/.*\.go$
text: "time.Now"
linters:
- forbidigo
- path: chasm/lib/.*_test\.go$
- path: chasm/lib/.*_test\.go$|service/matching/.*_test\.go$
text: "time.Now"
linters:
- forbidigo
Expand All @@ -178,6 +184,15 @@ linters:
text: "Unix|UnixMilli|UnixNano"
linters:
- forbidigo
# time.NewTimer/AfterFunc/After rules only apply to matching service
- path-except: service/matching/.*\.go$
text: "time.NewTimer|time.AfterFunc|time.After"
linters:
- forbidigo
- path: service/matching/.*_test\.go$
text: "time.NewTimer|time.AfterFunc|time.After"
linters:
- forbidigo
# Allow in tests
- path: _test\.go$
text: "Unix|UnixMilli|UnixNano"
Expand Down
3 changes: 2 additions & 1 deletion service/matching/ack_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/stretchr/testify/suite"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
Expand Down Expand Up @@ -230,6 +231,6 @@ func newTestAckMgr(logger log.Logger) *ackManager {
f, _ := tqid.NewTaskQueueFamily("", "test-queue")
prtn := f.TaskQueue(enumspb.TASK_QUEUE_TYPE_WORKFLOW).NormalPartition(0)
tlCfg := newTaskQueueConfig(prtn.TaskQueue(), cfg, "test-namespace")
db := newTaskQueueDB(tlCfg, tm, UnversionedQueueKey(prtn), logger, metrics.NoopMetricsHandler, false)
db := newTaskQueueDB(tlCfg, tm, UnversionedQueueKey(prtn), logger, metrics.NoopMetricsHandler, false, clock.NewRealTimeSource())
return newAckManager(db, logger)
}
6 changes: 5 additions & 1 deletion service/matching/backlog_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
taskqueuespb "go.temporal.io/server/api/taskqueue/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/future"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -68,6 +69,7 @@ type (
throttledLogger log.ThrottledLogger
matchingClient matchingservice.MatchingServiceClient
metricsHandler metrics.Handler
systemClock clock.TimeSource
initializedError *future.FutureImpl[struct{}]
// skipFinalUpdate controls behavior on Stop: if it's false, we try to write one final
// update before unloading
Expand All @@ -86,6 +88,7 @@ func newBacklogManager(
throttledLogger log.ThrottledLogger,
matchingClient matchingservice.MatchingServiceClient,
metricsHandler metrics.Handler,
systemClock clock.TimeSource,
) *backlogManagerImpl {
bmg := &backlogManagerImpl{
pqMgr: pqMgr,
Expand All @@ -95,10 +98,11 @@ func newBacklogManager(
logger: logger,
throttledLogger: throttledLogger,
config: config,
systemClock: systemClock,
initializedError: future.NewFuture[struct{}](),
}
isDraining := false // newBacklogManager can't be used for draining
bmg.db = newTaskQueueDB(config, taskManager, pqMgr.QueueKey(), logger, metricsHandler, isDraining)
bmg.db = newTaskQueueDB(config, taskManager, pqMgr.QueueKey(), logger, metricsHandler, isDraining, systemClock)
bmg.taskWriter = newTaskWriter(bmg)
bmg.taskReader = newTaskReader(bmg)
bmg.taskAckManager = newAckManager(bmg.db, logger)
Expand Down
5 changes: 5 additions & 0 deletions service/matching/backlog_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/primitives/timestamp"
Expand Down Expand Up @@ -79,6 +80,7 @@ func (s *BacklogManagerTestSuite) SetupTest() {
s.ptqMgr.EXPECT().QueueKey().Return(queue).AnyTimes()
s.ptqMgr.EXPECT().ProcessSpooledTask(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
s.ptqMgr.EXPECT().GetFairnessWeightOverrides().AnyTimes().Return(fairnessWeightOverrides{ /* To avoid deadlock with gomock method */ })
s.ptqMgr.EXPECT().TimeSource().Return(clock.NewRealTimeSource()).AnyTimes()

var ctx context.Context
ctx, s.cancelCtx = context.WithCancel(context.Background())
Expand All @@ -96,6 +98,7 @@ func (s *BacklogManagerTestSuite) SetupTest() {
metrics.NoopMetricsHandler,
func() counter.Counter { return counter.NewMapCounter() },
false,
clock.NewRealTimeSource(),
)
} else if s.newMatcher {
s.blm = newPriBacklogManager(
Expand All @@ -108,6 +111,7 @@ func (s *BacklogManagerTestSuite) SetupTest() {
nil,
metrics.NoopMetricsHandler,
false,
clock.NewRealTimeSource(),
)
} else {
s.blm = newBacklogManager(
Expand All @@ -119,6 +123,7 @@ func (s *BacklogManagerTestSuite) SetupTest() {
s.logger,
nil,
metrics.NoopMetricsHandler,
clock.NewRealTimeSource(),
)
}
}
Expand Down
30 changes: 17 additions & 13 deletions service/matching/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
Expand Down Expand Up @@ -39,6 +40,7 @@ type (
store persistence.TaskManager
logger log.Logger
metricsHandler metrics.Handler
systemClock clock.TimeSource

// mutable
sync.Mutex
Expand Down Expand Up @@ -96,6 +98,7 @@ func newTaskQueueDB(
logger log.Logger,
metricsHandler metrics.Handler,
isDraining bool,
systemClock clock.TimeSource,
) *taskQueueDB {
return &taskQueueDB{
config: config,
Expand All @@ -104,6 +107,7 @@ func newTaskQueueDB(
store: store,
logger: logger,
metricsHandler: metricsHandler,
systemClock: systemClock,
}
}

Expand Down Expand Up @@ -192,7 +196,7 @@ func (db *taskQueueDB) takeOverTaskQueueLocked(
db.rangeID = 0
return err
}
db.lastWrite = time.Now()
db.lastWrite = db.systemClock.Now()
// We took over the task queue and are not sure what tasks may have been written
// before. Set max read level id of all subqueues to just before our new block.
maxReadLevel := rangeIDToTaskIDBlock(db.rangeID, db.config.RangeSize).start - 1
Expand All @@ -218,7 +222,7 @@ func (db *taskQueueDB) takeOverTaskQueueLocked(
db.rangeID = 0
return err
}
db.lastWrite = time.Now()
db.lastWrite = db.systemClock.Now()
// In this case, ensureDefaultSubqueuesLocked already initialized subqueue 0 to have
// ackLevel and maxReadLevel 0, so we don't need to initialize them.
softassert.That(db.logger, db.subqueues[0].maxReadLevel == 0, "should have maxReadLevel 0 here")
Expand Down Expand Up @@ -287,7 +291,7 @@ func (db *taskQueueDB) SyncState(ctx context.Context) error {
// We only need to write if something changed, or if we're past half of the sticky queue TTL.
// Note that we use the same threshold for non-sticky queues even though they don't have a
// persistence TTL, since the scavenger looks for metadata that hasn't been updated in 48 hours.
needWrite := db.lastChange.After(db.lastWrite) || time.Since(db.lastWrite) > stickyTaskQueueTTL/2
needWrite := db.lastChange.After(db.lastWrite) || db.systemClock.Since(db.lastWrite) > stickyTaskQueueTTL/2
if !needWrite {
return nil
}
Expand All @@ -308,19 +312,19 @@ func (db *taskQueueDB) updateAckLevelAndBacklogStats(subqueue subqueueIndex, new
tag.Any("new-ack-level", newAckLevel))
}
if dbQueue.AckLevel != newAckLevel {
db.lastChange = time.Now()
db.lastChange = db.systemClock.Now()
dbQueue.AckLevel = newAckLevel
}

if newAckLevel == db.getMaxReadLevelLocked(subqueue) {
// Reset approximateBacklogCount to fix the count divergence issue
if dbQueue.ApproximateBacklogCount != 0 || !dbQueue.oldestTime.Equal(oldestTime) {
db.lastChange = time.Now()
db.lastChange = db.systemClock.Now()
dbQueue.ApproximateBacklogCount = 0
dbQueue.oldestTime = oldestTime
}
} else if countDelta != 0 {
db.lastChange = time.Now()
db.lastChange = db.systemClock.Now()
db.updateBacklogStatsLocked(subqueue, countDelta, oldestTime)
}
}
Expand Down Expand Up @@ -356,7 +360,7 @@ func (db *taskQueueDB) setKnownFairBacklogCount(subqueue subqueueIndex, count in
defer db.Unlock()

if db.subqueues[subqueue].ApproximateBacklogCount != count {
db.lastChange = time.Now()
db.lastChange = db.systemClock.Now()
db.subqueues[subqueue].ApproximateBacklogCount = count
if count == 0 {
db.subqueues[subqueue].oldestTime = time.Time{}
Expand Down Expand Up @@ -493,9 +497,9 @@ func (db *taskQueueDB) CreateTasks(
// Only update lastWrite for persistence implementations that update metadata on CreateTasks,
// otherwise we have a change to ApproximateBacklogCount we need to write.
if resp.UpdatedMetadata {
db.lastWrite = time.Now()
db.lastWrite = db.systemClock.Now()
} else {
db.lastChange = time.Now()
db.lastChange = db.systemClock.Now()
}
} else if _, ok := err.(*persistence.ConditionFailedError); ok {
// tasks definitely were not created, restore the counter. For other errors tasks may or may not be created.
Expand Down Expand Up @@ -566,9 +570,9 @@ func (db *taskQueueDB) CreateFairTasks(
// Only update lastWrite for persistence implementations that update metadata on CreateTasks,
// otherwise we have a change to ApproximateBacklogCount we need to write.
if resp.UpdatedMetadata {
db.lastWrite = time.Now()
db.lastWrite = db.systemClock.Now()
} else {
db.lastChange = time.Now()
db.lastChange = db.systemClock.Now()
}
} else if _, ok := err.(*persistence.ConditionFailedError); ok {
// Tasks definitely were not created, restore the counter. For other errors tasks may or may not be created.
Expand Down Expand Up @@ -708,7 +712,7 @@ func (db *taskQueueDB) expiryTime() *timestamppb.Timestamp {
case enumspb.TASK_QUEUE_KIND_NORMAL:
return nil
case enumspb.TASK_QUEUE_KIND_STICKY:
return timestamppb.New(time.Now().Add(stickyTaskQueueTTL))
return timestamppb.New(db.systemClock.Now().Add(stickyTaskQueueTTL))
default:
panic(fmt.Sprintf("taskQueueDB encountered unknown task kind: %v", db.queue.Partition().Kind()))
}
Expand Down Expand Up @@ -766,7 +770,7 @@ func (db *taskQueueDB) emitBacklogGaugesLocked() {
if oldestTime.IsZero() {
metrics.ApproximateBacklogAgeSeconds.With(db.metricsHandler).Record(0)
} else {
metrics.ApproximateBacklogAgeSeconds.With(db.metricsHandler).Record(time.Since(oldestTime).Seconds())
metrics.ApproximateBacklogAgeSeconds.With(db.metricsHandler).Record(db.systemClock.Since(oldestTime).Seconds())
}
metrics.TaskLagPerTaskQueueGauge.With(db.metricsHandler).Record(float64(totalLag))
}
Expand Down
24 changes: 15 additions & 9 deletions service/matching/fair_backlog_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
taskqueuespb "go.temporal.io/server/api/taskqueue/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/future"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand All @@ -26,12 +27,13 @@ import (

type (
fairBacklogManagerImpl struct {
pqMgr physicalTaskQueueManager
config *taskQueueConfig
tqCtx context.Context
isDraining bool
db *taskQueueDB
taskWriter *fairTaskWriter
pqMgr physicalTaskQueueManager
config *taskQueueConfig
tqCtx context.Context
isDraining bool
systemClock clock.TimeSource
db *taskQueueDB
taskWriter *fairTaskWriter

subqueueLock sync.Mutex
subqueues []*fairTaskReader // subqueue index -> fairTaskReader
Expand Down Expand Up @@ -62,6 +64,7 @@ func newFairBacklogManager(
metricsHandler metrics.Handler,
counterFactory func() counter.Counter,
isDraining bool,
systemClock clock.TimeSource,
) *fairBacklogManagerImpl {
// For the purposes of taskQueueDB, call this just a TaskManager. It'll return errors if we
// use it incorectly. TODO(fairness): consider a cleaner way of doing this.
Expand All @@ -72,7 +75,8 @@ func newFairBacklogManager(
config: config,
tqCtx: tqCtx,
isDraining: isDraining,
db: newTaskQueueDB(config, taskManager, pqMgr.QueueKey(), logger, metricsHandler, isDraining),
systemClock: systemClock,
db: newTaskQueueDB(config, taskManager, pqMgr.QueueKey(), logger, metricsHandler, isDraining, systemClock),
subqueuesByPriority: make(map[priorityKey]subqueueIndex),
priorityBySubqueue: make(map[subqueueIndex]priorityKey),
matchingClient: matchingClient,
Expand Down Expand Up @@ -208,10 +212,12 @@ func (c *fairBacklogManagerImpl) getSubqueueForPriority(priority priorityKey) su

func (c *fairBacklogManagerImpl) periodicSync() {
for {
timerC, timer := c.systemClock.NewTimer(c.config.UpdateAckInterval())
select {
case <-c.tqCtx.Done():
timer.Stop()
return
case <-time.After(c.config.UpdateAckInterval()):
case <-timerC:
ctx, cancel := context.WithTimeout(c.tqCtx, ioTimeout)
err := c.db.SyncState(ctx)
cancel()
Expand Down Expand Up @@ -296,7 +302,7 @@ func (c *fairBacklogManagerImpl) BacklogStatsByPriority() map[int32]*taskqueuepb
// Find greatest backlog age for across all subqueues for the same priority.
oldestBacklogTime := c.subqueues[subqueueIdx].getOldestBacklogTime()
if !oldestBacklogTime.IsZero() {
oldestBacklogAge := time.Since(oldestBacklogTime)
oldestBacklogAge := c.systemClock.Since(oldestBacklogTime)
if oldestBacklogAge > result[pk].ApproximateBacklogAge.AsDuration() {
result[pk].ApproximateBacklogAge = durationpb.New(oldestBacklogAge)
}
Expand Down
12 changes: 6 additions & 6 deletions service/matching/fair_task_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type (
lock sync.Mutex

readPending bool
backoffTimer *time.Timer
backoffTimer clock.Timer
retrier backoff.Retrier
addRetries *semaphore.Weighted

Expand Down Expand Up @@ -91,7 +91,7 @@ func newFairTaskReader(
logger: backlogMgr.logger,
retrier: backoff.NewRetrier(
common.CreateReadTaskRetryPolicy(),
clock.NewRealTimeSource(),
backlogMgr.systemClock,
),
backlogAge: newBacklogAgeTracker(),
addRetries: semaphore.NewWeighted(concurrentAddRetries),
Expand All @@ -103,7 +103,7 @@ func newFairTaskReader(
evictedAcks: btree.NewBTreeG(fairLevel.less),

// gc state
lastGCTime: time.Now(),
lastGCTime: backlogMgr.systemClock.Now(),
}
}

Expand Down Expand Up @@ -521,7 +521,7 @@ func (tr *fairTaskReader) retryReadAfter(duration time.Duration) {
defer tr.lock.Unlock()

if tr.backoffTimer == nil {
tr.backoffTimer = time.AfterFunc(duration, func() {
tr.backoffTimer = tr.backlogMgr.systemClock.AfterFunc(duration, func() {
tr.lock.Lock()
defer tr.lock.Unlock()
tr.backoffTimer = nil
Expand Down Expand Up @@ -627,7 +627,7 @@ func (tr *fairTaskReader) maybeGCLocked() {
return
}
tr.inGC = true
tr.lastGCTime = time.Now()
tr.lastGCTime = tr.backlogMgr.systemClock.Now()
// gc in new goroutine so poller doesn't have to wait
go tr.doGC(tr.ackLevel)
}
Expand All @@ -637,7 +637,7 @@ func (tr *fairTaskReader) shouldGCLocked() bool {
return false
}
return tr.numToGC >= tr.backlogMgr.config.MaxTaskDeleteBatchSize() ||
time.Since(tr.lastGCTime) > tr.backlogMgr.config.TaskDeleteInterval()
tr.backlogMgr.systemClock.Since(tr.lastGCTime) > tr.backlogMgr.config.TaskDeleteInterval()
}

// called in new goroutine
Expand Down
Loading
Loading