Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 55 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ This is NOT a fully featured CI pipeline solution.
<!-- TOC -->
* [Badges](#badges)
* [Components](#components)
* [prunner (this repository)](#prunner--this-repository-)
* [prunner (this repository)](#prunner-this-repository)
* [prunner-ui](#prunner-ui)
* [Flowpack.Prunner](#flowpackprunner)
* [User guide](#user-guide)
Expand All @@ -32,7 +32,10 @@ This is NOT a fully featured CI pipeline solution.
* [Limiting concurrency](#limiting-concurrency)
* [The wait list](#the-wait-list)
* [Debounce jobs with a start delay](#debounce-jobs-with-a-start-delay)
* [Partitioned Wait Lists](#partitioned-wait-lists)
* [Disabling fail-fast behavior](#disabling-fail-fast-behavior)
* [Error handling with on_error](#error-handling-with-on_error)
* [Important notes:](#important-notes)
* [Configuring retention period](#configuring-retention-period)
* [Handling of child processes](#handling-of-child-processes)
* [Graceful shutdown](#graceful-shutdown)
Expand All @@ -44,11 +47,11 @@ This is NOT a fully featured CI pipeline solution.
* [Development](#development)
* [Requirements](#requirements)
* [Running locally](#running-locally)
* [IDE Setup (IntelliJ/GoLand)](#ide-setup--intellijgoland-)
* [IDE Setup (IntelliJ/GoLand)](#ide-setup-intellijgoland)
* [Building for different operating systems.](#building-for-different-operating-systems)
* [Running Tests](#running-tests)
* [Memory Leak Debugging](#memory-leak-debugging)
* [Generate OpenAPI (Swagger) spec](#generate-openapi--swagger--spec)
* [Generate OpenAPI (Swagger) spec](#generate-openapi-swagger-spec)
* [Releasing](#releasing)
* [Security concept](#security-concept)
* [License](#license)
Expand Down Expand Up @@ -222,7 +225,8 @@ pipelines:

To deactivate the queuing altogether, set `queue_limit: 0`.

Now, if the queue is limited, an error occurs when it is full and you try to add a new job.
Now, if the queue is limited (and default `queue_strategy: append` is configured),
an error occurs when it is full and you try to add a new job.

Alternatively, you can also set `queue_strategy: replace` to replace the last job in the
queue by the newly added one:
Expand Down Expand Up @@ -257,6 +261,7 @@ in form of a zero or positive decimal value with a time unit ("ms", "s", "m", "h
```yaml
pipelines:
do_something:
# NOTE: to prevent starvation, use queue_limit >= 2x
queue_limit: 1
queue_strategy: replace
concurrency: 1
Expand All @@ -265,6 +270,52 @@ pipelines:
tasks: # as usual
```

NOTE: If you use `queue_limit: 1` and `start_delay`, you will run into **starvation** (=the job never starts)
if jobs are submitted quicker than `start_delay`. If you instead use `queue_limit: 2` or higher, you can
avoid this issue: Then, the 1st slot will always be worked on after `start_delay`, while the 2nd slot will
be replaced quickly.

### Partitioned Wait Lists

If you have a multi-tenant application, you might want to use **one wait-list per tenant** (e.g. for import jobs),
combined with global `concurrency` limits (depending on the globally available server resources).

To enable this, do the following:

- `queue_strategy: partitioned_replace`: Enabled partitioned wait list
- `queue_partition_limit: 1` (or higher): Configure the number of wait-list slots per tenant. The last slot gets replaced
when the wait-list is full.
- can be combined with arbitrary `start_delay` and `concurrency` as expected

Full example:

```
pipelines:
do_something:
queue_strategy: partitioned_replace
# prevent starvation
queue_partition_limit: 2
concurrency: 1
# Queues a run of the job and only starts it after 10 seconds have passed (if no other run was triggered which replaced the queued job)
start_delay: 10s
tasks: # as usual

```

Additionally, when submitting a job, you need to specify the `queuePartition` argument:

```
POST /pipelines/schedule

{
"pipeline": "my_pipeline",
"variables": {
...
},
"queuePartition": "tenant_foo"
}
```


### Disabling fail-fast behavior

Expand Down
25 changes: 25 additions & 0 deletions definition/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,28 @@ func TestLoadRecursively_WithMissingDependency(t *testing.T) {
_, err := LoadRecursively("../test/fixtures/missingDep.yml")
require.EqualError(t, err, `loading ../test/fixtures/missingDep.yml: invalid pipeline definition "test_it": missing task "not_existing" referenced in depends_on of task "test"`)
}

func TestPartitionedWaitlist_OK(t *testing.T) {
_, err := LoadRecursively("../test/fixtures/partitioned_waitlist.yml")
require.NoError(t, err)
}

func TestPartitionedWaitlist_err_0_partition_limit(t *testing.T) {
_, err := LoadRecursively("../test/fixtures/partitioned_waitlist_err_0_partition_limit.yml")
require.EqualError(t, err, `loading ../test/fixtures/partitioned_waitlist_err_0_partition_limit.yml: invalid pipeline definition "test_it": queue_partition_limit must be defined and >=1 if queue_strategy=partitioned_replace`)
}

func TestPartitionedWaitlist_err_no_partition_limit(t *testing.T) {
_, err := LoadRecursively("../test/fixtures/partitioned_waitlist_err_no_partition_limit.yml")
require.EqualError(t, err, `loading ../test/fixtures/partitioned_waitlist_err_no_partition_limit.yml: invalid pipeline definition "test_it": queue_partition_limit must be defined and >=1 if queue_strategy=partitioned_replace`)
}

func TestPartitionedWaitlist_err_queue_limit(t *testing.T) {
_, err := LoadRecursively("../test/fixtures/partitioned_waitlist_err_queue_limit.yml")
require.EqualError(t, err, `loading ../test/fixtures/partitioned_waitlist_err_queue_limit.yml: invalid pipeline definition "test_it": queue_limit is not allowed if queue_strategy=partitioned_replace, use queue_partition_limit instead`)
}

func TestWaitlist_err_partitioned_queue_limit(t *testing.T) {
_, err := LoadRecursively("../test/fixtures/waitlist_err_partitioned_queue_limit.yml")
require.EqualError(t, err, `loading ../test/fixtures/waitlist_err_partitioned_queue_limit.yml: invalid pipeline definition "test_it": queue_partition_limit is not allowed if queue_strategy=append|replace, use queue_limit instead`)
}
34 changes: 31 additions & 3 deletions definition/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,15 @@ type OnErrorTaskDef struct {
type PipelineDef struct {
// Concurrency declares how many instances of this pipeline are allowed to execute concurrently (defaults to 1)
Concurrency int `yaml:"concurrency"`
// QueueLimit is the number of slots for queueing jobs if the allowed concurrency is exceeded, defaults to unbounded (nil)
// QueueLimit is the number of slots for queueing jobs if the allowed concurrency is exceeded, defaults to unbounded (nil). Only allowed with queue_strategy=append|replace, not with partitioned_replace (there, use queue_partition_limit instead)
QueueLimit *int `yaml:"queue_limit"`

// QueuePartitionLimit is the number of slots for queueing jobs per partition, if queue_strategy=partitioned_replace is used.
QueuePartitionLimit *int `yaml:"queue_partition_limit"`

// QueueStrategy to use when adding jobs to the queue (defaults to append)
QueueStrategy QueueStrategy `yaml:"queue_strategy"`

// StartDelay will delay the start of a job if the value is greater than zero (defaults to 0)
StartDelay time.Duration `yaml:"start_delay"`

Expand Down Expand Up @@ -100,6 +105,19 @@ func (d PipelineDef) validate() error {
return errors.New("start_delay needs queue_limit > 0")
}

if d.QueueStrategy == QueueStrategyPartitionedReplace {
if d.QueueLimit != nil {
return errors.New("queue_limit is not allowed if queue_strategy=partitioned_replace, use queue_partition_limit instead")
}
if d.QueuePartitionLimit == nil || *d.QueuePartitionLimit < 1 {
return errors.New("queue_partition_limit must be defined and >=1 if queue_strategy=partitioned_replace")
}
} else {
if d.QueuePartitionLimit != nil {
return errors.New("queue_partition_limit is not allowed if queue_strategy=append|replace, use queue_limit instead")
}
}

for taskName, taskDef := range d.Tasks {
for _, dependentTask := range taskDef.DependsOn {
_, exists := d.Tasks[dependentTask]
Expand Down Expand Up @@ -164,13 +182,21 @@ func (d PipelineDef) Equals(otherDef PipelineDef) bool {
return true
}

// QueueStrategy defines the behavior when jobs wait (=are queued) before pipeline execution.
type QueueStrategy int

const (
// QueueStrategyAppend appends jobs to the queue until queue limit is reached
// QueueStrategyAppend appends jobs to the queue until the queue limit is reached (FIFO)
QueueStrategyAppend QueueStrategy = 0
// QueueStrategyReplace replaces pending jobs (with same variables) instead of appending to the queue

// QueueStrategyReplace replaces the **LAST** pending job if the queue limit is reached. If the queue is not yet full, the job is appended.
// NOTE: if using queue_limit=1 + replace, this can lead to starvation if rapidly enqueuing jobs. If using queue_limit >= 2, this cannot happen anymore.
// (see 2025_08_14_partitioned_waitlist.md for detailed description)
QueueStrategyReplace QueueStrategy = 1

// QueueStrategyPartitionedReplace implements the "partitioned waitlist" strategy, as explained in 2025_08_14_partitioned_waitlist.md.
// -> it replaces the **LAST** pending job of a given partition, if the partition is full (=queue_partition_limit).
QueueStrategyPartitionedReplace QueueStrategy = 2
)

func (s *QueueStrategy) UnmarshalYAML(unmarshal func(interface{}) error) error {
Expand All @@ -185,6 +211,8 @@ func (s *QueueStrategy) UnmarshalYAML(unmarshal func(interface{}) error) error {
*s = QueueStrategyAppend
case "replace":
*s = QueueStrategyReplace
case "partitioned_replace":
*s = QueueStrategyPartitionedReplace
default:
return errors.Errorf("unknown queue strategy: %q", strategyName)
}
Expand Down
111 changes: 111 additions & 0 deletions docs/2025_08_14_partitioned_waitlist.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# FEATURE: Partitioned Waitlists


# Problem description

## Current state

> **FROM README**
>
> By default, if you limit concurrency, and the limit is exceeded, further jobs are added to the
> waitlist of the pipeline.
>
> However, you have some options to configure this as well:
>
> The waitlist can have a maximum size, denoted by `queue_limit`:
>
> ```yaml
> pipelines:
> do_something:
> queue_limit: 1
> concurrency: 1
> tasks: # as usual
> ```
>
> To deactivate the queuing altogether, set `queue_limit: 0`.
>
> Now, if the queue is limited, an error occurs when it is full and you try to add a new job.
>
> Alternatively, you can also set `queue_strategy: replace` to replace the last job in the
> queue by the newly added one:

## Current state -> Problem "Starvation" with "Debounce" ??

> Queues a run of the job and only starts it after 10 seconds have passed (if no other run was triggered which replaced the queued job)

```
--> time (1 char = 1 s)

# 1 waitlist slot, delay 10 s

a____ _____ <-- here the job A is queued
A <-- here job A starts


# 1 waitlist slot, delay 10 s
# STARVATION CAN HAPPEN
a____ b____ c_____ _____
C


# 2 waitlist slots, delay 10 s
# !!! PREVENTS STARVATION !!!
a____ b__c_ ______ _____
[a_] [ab] A <-- here, a starts. NO STARVATION
[ac]
[c_]
```

SUMMARY:
- Starvation can only happen if waitlist size=1; if waitlist size=2 (or bigger) cannot happen because always the LAST job gets replaced.
- We cannot debounce immediately; so we ALWAYS wait at least for start_delay. (not a problem for us right now).

## problem description

In a project, we have 50 Neos instances, which use prunner for background tasks (some are short, some are long).

Currently, we have 4 pipelines globally
- concurrency 1
- concurrency 8
- concurrency 4
- concurrency 4 (import job)
- -> needs the global concurrency to limit the needed server resources

Now, a new pipeline should be added for "irregular import jobs" triggered by webhooks.
- can happen very quickly after each other
- -> Pipeline should start after a certain amount of time (newer should override older pipelines)
- StartDelay combined with QueueStrategy "Replace"
- `waitList[len(waitList)-1] = job` -> *LAST* Element is replaced of wait list.
- -> GLOBAL replacement does not work, because each job has arguments (which are relevant, i.e. tenant ID).

We still want to have a GLOBAL CONCURRENCY LIMIT (per pipeline), but a WAITLIST per instance.


## Solution Idea:

we want to be able to SEGMENT the waitlist into different partitions. The `queue_strategy` and `queue_limit` should be per partition.
`concurrency` stays per pipeline (as before)

```
**LOGICAL VIEW** (Idea)
┌──────────────────────┐ ┌──────────────────────────────────────────────────┐
│ Waitlist Instance 1 │ │ Pipeline (with concurrency 2) │
├──────────────────────┤ │ │
│ Waitlist Instance 2 │ -> ├──────────────────────────────────────────────────┤
├──────────────────────┤ │ │
│ Waitlist Instance 3 │ │ │
└──────────────────────┘ └──────────────────────────────────────────────────┘

if job is *delayed*,
stays in wait list
for this duration

```

Technically, we still have a SINGLE Wait List per pipeline, but the jobs can be partitioned by `waitlist_partition_id`.

-> In this case, the `replace` strategy will replace the last element of the given partition.

-> we create a new queueStrategy for the partitioned waitlist: `partitioned_replace`

If we partition the waitlist, the waitlist can grow up to queue_limit * number of partitions.
11 changes: 11 additions & 0 deletions helper/slice_utils/sliceUtils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package slice_utils

func Filter[T any](s []T, p func(i T) bool) []T {
var result []T
for _, i := range s {
if p(i) {
result = append(result, i)
}
}
return result
}
Loading