Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 67 additions & 24 deletions docs/openfaas-pro/jetstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,13 @@ This page is primarily concerned with how to configure the Queue Worker.

You can learn about [asynchronous invocations here](/reference/async).

## Async use cases

Every function in OpenFaaS can be invoked either synchronously or asynchronously. Asynchronous invocations are retried automatically, and can return their response to a given endpoint via a webhook.

Popular use-cases include:

- Batch processing and machine learning
- Resilient data pipelines
- Receiving webhooks
- Long running jobs

On the blog we show reference examples built upon these architectural patterns:

- [Exploring the Fan out and Fan in pattern with OpenFaaS](https://www.openfaas.com/blog/fan-out-and-back-in-using-functions/)
- [Generate PDFs at scale on Kubernetes using OpenFaaS and Puppeteer](https://www.openfaas.com/blog/pdf-generation-at-scale-on-kubernetes/)
- [How to process your data the resilient way with back pressure](https://www.openfaas.com/blog/limits-and-backpressure/)
- [The Next Generation of Queuing: JetStream for OpenFaaS](https://www.openfaas.com/blog/jetstream-for-openfaas/)

## Terminology

* [NATS](https://nats.io/) - an open source messaging system hosted by the [CNCF](https://www.cncf.io/)
Expand All @@ -54,35 +43,88 @@ NATS can be configured with a quorum of at least 3 replicas so it can recover da

See [Deploy NATS for OpenFaaS](#deploy-nats-for-openfaas) for instruction on how to configure OpenFaaS and deploy NATS.

## Queue mode

The queue-worker can operate in two modes, configured via the `mode` parameter:

```yaml
jetstreamQueueWorker:
mode: static | function
```

* **`static`** (default) - the queue-worker uses a shared NATS Consumer scaled based upon the number of replicas of the queue-worker. Requests are processed in FIFO order. With a shared consumer, a single function that receives a large burst of invocations can dominate the queue, starving invocations for other functions until the backlog clears. This is the default mode, and ideal for development, or constrained environments.

* **`function`** - the queue-worker creates a dedicated NATS Consumer per function on demand. This gives each function its own view of the queue, so a burst of invocations for one function does not block or delay processing for other functions. It also enables [queue-based scaling](#queue-based-scaling-for-functions) and [adaptive concurrency](#adaptive-concurrency).

!!! info "OpenFaaS for Enterprises feature"
Function mode requires [OpenFaaS for Enterprises](https://openfaas.com/pricing/).

When using `function` mode, the `inactiveThreshold` parameter can be used to set the threshold for when a function is considered inactive. If a function is inactive for longer than the threshold, the queue-worker will delete the NATS Consumer for that function.

```yaml
jetstreamQueueWorker:
mode: function
consumer:
inactiveThreshold: 30s
```

## Features

### Queue-based scaling for functions

The queue-worker uses a shared NATS Stream and NATS Consumer by default, which works well with many of the existing [autoscaling strategies](/reference/async/#autoscaling). Requests are processed in a FIFO order, and it is possible for certain functions to dominate or starve the queue.
!!! info "OpenFaaS for Enterprises feature"
This feature requires [OpenFaaS for Enterprises](https://openfaas.com/pricing/) and [function mode](#queue-mode).

In [function mode](#queue-mode), each function gets a dedicated NATS Consumer, making it possible to scale individual functions based upon the depth of their queued asynchronous requests.

See: [autoscaling based on queue depth](/reference/autoscaling/)

It's challenging to scale individual functions based upon their queued requests within a shared queue with a single static consumer.
### Adaptive concurrency

Two alternatives exist for OpenFaaS for Enterprises customers, both of which enable fairer processing of messages across functions, and better autoscaling behaviour:
!!! info "OpenFaaS for Enterprises feature"
This feature requires [OpenFaaS for Enterprises](https://openfaas.com/pricing/) and [function mode](#queue-mode).

1. Setup additional named queue-worker deployments with specific functions allocated to them.
2. Configure the built-in queue-worker to partition itself by function, creating consumers on demand, and giving much of the effect of 1. with less operational overhead.
Without adaptive concurrency, the queue-worker sends invocations as fast as it can pull them from the queue. If a function cannot keep up, this can lead to:

With the approach below, functions can be scaled upon the depth of their queued asynchronous requests, with a consumer created for each function as and when it is needed.
- Excessive error responses and wasted retries
- The need to fine-tune retry parameters for each function individually

The `mode` parameter can be set to `static` (default) or `function`.
Adaptive concurrency automatically learns how many concurrent invocations each function can handle:

- Increases the limit after successful responses (2xx)
- Backs off after rejections (429)
- Adjusts in real-time as replicas scale up or down

It is enabled by default in [function mode](#queue-mode) but can be disabled to get the legacy behaviour:

```yaml
jetstreamQueueWorker:
mode: static | function
consumer:
inactiveThreshold: 30s
adaptiveConcurrency: false
```

* If set to `static`, the queue-worker will scale its NATS Consumers based upon the number of replicas of the queue-worker. This is the default mode, and ideal for development, or constrained environments.
**Example**

* If set to `function`, the queue-worker will scale its NATS Consumers based upon the number of functions that are active in the queue. This is ideal for production environments where you want to [scale your functions based upon the queue depth](/reference/autoscaling/). It also gives messages queued at different times a fairer chance of being processed earlier.
Deploy the sleep function with [capacity-based autoscaling](/architecture/autoscaling/#scaling-modes), a `max_inflight` hard limit of 5, and a maximum of 10 replicas:

* The `inactiveThreshold` parameter can be used to set the threshold for when a function is considered inactive. If a function is inactive for longer than the threshold, the queue-worker will delete the NATS Consumer for that function.
```bash
faas-cli store deploy sleep \
--label com.openfaas.scale.max=10 \
--label com.openfaas.scale.target=5 \
--label com.openfaas.scale.type=capacity \
--label com.openfaas.scale.target-proportion=0.9 \
--env max_inflight=5
```

Submit 500 asynchronous invocations:

```bash
hey -m POST -n 500 -c 4 \
http://127.0.0.1:8080/async-function/sleep
```

Without adaptive concurrency, the queue-worker pushes invocations as fast as it can. The single replica accepts the first 5 but rejects the rest with 429 status codes. Those rejected requests are retried with exponential back-off, wasting time and resources.

With adaptive concurrency enabled, the queue-worker backs off as soon as it receives 429 responses, holding the remaining requests in the queue. It periodically probes the function, and as the autoscaler adds replicas to handle the load, the queue-worker detects successful responses and increases the concurrency limit, draining the queue without unnecessary retries.

### Metrics and monitoring

Expand Down Expand Up @@ -122,6 +164,7 @@ jetstreamQueueWorker:
![Structured logs formatted for the console](https://www.openfaas.com/images/2022-07-jetstream-for-openfaas/structured-logs.png)
> Structured logs formatted for the console


## Clear or reset a queue

From time to time, you may wish to reset or purge an async message queue. You may have generated a large number of unnecessary messages that you want to remove from the queue.
Expand Down
15 changes: 14 additions & 1 deletion docs/reference/async.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@ Why might you want to use an asynchronous invocation?

You're working with a partner's webhook. They send you data, but if you don't send a HTTP 200 OK within 1 second, they assume failure and retry the message. This works well if your function can complete in less than a second on every invocation, but if there's any risk that it can't, you need another solution. So you will give your partner a URL to the asynchronous function URL instead and it will reply within several milliseconds whilst still processing the data.

Other popular use-cases include:

- Batch processing and machine learning
- Resilient data pipelines
- Long running jobs

See these blog posts for examples of asynchronous patterns in practice:

- [Exploring the Fan out and Fan in pattern with OpenFaaS](https://www.openfaas.com/blog/fan-out-and-back-in-using-functions/)
- [Generate PDFs at scale on Kubernetes using OpenFaaS and Puppeteer](https://www.openfaas.com/blog/pdf-generation-at-scale-on-kubernetes/)
- [How to process your data the resilient way with back pressure](https://www.openfaas.com/blog/limits-and-backpressure/)
- [The Next Generation of Queuing: JetStream for OpenFaaS](https://www.openfaas.com/blog/jetstream-for-openfaas/)

### Synchronous vs asynchronous invocation

* Sync
Expand Down Expand Up @@ -110,7 +123,7 @@ There are limits for asynchronous functions, which you should understand before
* Concurrency / parallelism - the amount of function invocations processed at any one time.
* Named queues - by default there is one queue, but additional queues can be added. Each named queue can have its own timeout and concurrency.
* Payload size - the maximum size is configured to be 1MB. The limit is defined by NATS, but can be changed. Use a database, or S3 bucket for storing large payloads, and pass an identifier to function calls.
* Retries - retries are available in [OpenFaaS Pro](https://openfaas.com/support/) with an exponential back-off.
* Retries - asynchronous invocations can be retried automatically when a function returns an error. Retries use an exponential back-off algorithm. See: [retries](/openfaas-pro/retries).

The queue-worker uses a single timeout for how long it will spend processing a message, that is called `ack_wait`. If your longest function can run for *25 minutes*, then the `ack_wait` should be set to at least `25m`.

Expand Down