Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

⚠️ **Breaking API change:** `rivermigrate.Migrator.Validate` and `rivermigrate.Migrator.ValidateTx` now take a `*rivermigrate.ValidateOpts` parameter. Pass `nil` to preserve previous behavior. We normally endeavor not to make any breaking API changes, but this one will keep the API in a much nicer state, and is on an ancillary function that most installations won't be using. [PR #1259](https://github.com/riverqueue/river/pull/1259)

### Added

- Added `MetadataSet` to stage job metadata updates from worker middleware, `HookWorkBegin`, workers, or `HookWorkEnd`, with changes persisted when the job is completed. [PR #1269](https://github.com/riverqueue/river/pull/1269)

### Fixed

⚠️ **Breaking API change:** `rivermigrate.Migrator.Validate` and `rivermigrate.Migrator.ValidateTx` now take a `*rivermigrate.ValidateOpts` parameter. Pass `nil` to preserve previous behavior. We normally endeavor not to make any breaking API changes, but this one will keep the API in a much nicer state, and is on an ancillary function that most installations won't be using. [PR #1259](https://github.com/riverqueue/river/pull/1259)

### Changed

- Added `rivermigrate.ValidateOpts.TargetVersion` so validation can check migrations up to a specific target version, matching the target-version behavior available on `Migrate` and `MigrateTx`. Notably, this is a breaking API change as the validate functions previously didn't take any options. [PR #1259](https://github.com/riverqueue/river/pull/1259)
Expand Down
7 changes: 3 additions & 4 deletions internal/jobcompleter/job_completer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/riverqueue/river/internal/jobstats"
"github.com/riverqueue/river/internal/rivercommon"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/riverpilot"
Expand Down Expand Up @@ -578,11 +579,9 @@ func withRetries[T any](logCtx context.Context, baseService *baseservice.BaseSer
)

for attempt := 1; attempt <= numRetries; attempt++ {
const timeout = 10 * time.Second

// I've found that we want at least ten seconds for a large batch,
// although it usually doesn't need that long.
ctx, cancel := context.WithTimeout(uncancelledCtx, timeout)
ctx, cancel := context.WithTimeout(uncancelledCtx, rivercommon.HotOperationTimeout)
defer cancel()

retVal, err := retryFunc(ctx)
Expand All @@ -603,7 +602,7 @@ func withRetries[T any](logCtx context.Context, baseService *baseservice.BaseSer
slog.Int("attempt", attempt),
slog.String("err", err.Error()),
slog.String("sleep_duration", sleepDuration.String()),
slog.String("timeout", timeout.String()),
slog.String("timeout", rivercommon.HotOperationTimeout.String()),
)
if !disableSleep {
serviceutil.CancellableSleep(logCtx, sleepDuration)
Expand Down
11 changes: 11 additions & 0 deletions internal/rivercommon/river_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rivercommon
import (
"errors"
"regexp"
"time"
)

// These constants are made available in rivercommon so that they're accessible
Expand All @@ -17,6 +18,16 @@ const (
QueueDefault = "default"
)

// HotOperationTimeout attempts to standardize timeouts for some "hot"
// operations like locking available jobs or completing finished jobs. It's
// somewhat questionable whether it makes sense to share timing on these
// queries, but for the time being it makes more sense than each part of the
// code randomly choosing its own timing.
//
// We probably want to have another look at this in the not-too-distant future
// to make sure we can't do anything a bit smarter when it comes to timeouts.
const HotOperationTimeout = 10 * time.Second

const (
// MetadataKeyPeriodicJobID is a metadata key inserted with a periodic job
// when a configured periodic job has its ID property set. This lets
Expand Down
6 changes: 2 additions & 4 deletions rivershared/riverpilot/standard_pilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package riverpilot
import (
"context"
"sync/atomic"
"time"

"github.com/riverqueue/river/internal/rivercommon"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivertype"
)

const standardPilotJobGetAvailableTimeoutDefault = 10 * time.Second

type StandardPilot struct {
seq atomic.Int64
}
Expand All @@ -23,7 +21,7 @@ func (p *StandardPilot) JobGetAvailable(ctx context.Context, exec riverdriver.Ex
return nil, nil
}

ctx, cancel := context.WithTimeoutCause(ctx, standardPilotJobGetAvailableTimeoutDefault, context.DeadlineExceeded)
ctx, cancel := context.WithTimeout(ctx, rivercommon.HotOperationTimeout)
defer cancel()

return exec.JobGetAvailable(ctx, params)
Expand Down
Loading