Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,42 @@ After setup, Sequin will stream new changes to the sink as they occur in real-ti

Sequin comes with a web console/UI for configuration and monitoring. You can also configure Sequin as code using [YAML config files](https://sequinstream.com/docs/reference/sequin-yaml) and our [Management API](https://sequinstream.com/docs/management-api/introduction).

## AWS IAM authentication

When running on AWS (EKS, ECS, or EC2), Sequin supports IAM-based authentication so you don't need to manage long-lived AWS credentials.

### IRSA (IAM Roles for Service Accounts)

For EKS deployments, Sequin natively supports [IRSA](https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html). When the `AWS_ROLE_ARN` and `AWS_WEB_IDENTITY_TOKEN_FILE` environment variables are present (injected automatically by EKS), Sequin will use them to obtain temporary credentials via STS `AssumeRoleWithWebIdentity`. Credentials are refreshed automatically before expiry.

ECS task roles and EC2 instance profiles are also supported.

### Sinks

AWS sinks (SQS, SNS, Kinesis, and Kafka via MSK IAM) support a **"Use task role"** toggle. When enabled, the sink uses IRSA or task role credentials instead of explicit access keys. Configure this in the web console or via YAML:

```yaml
sinks:
- type: sqs
queue_url: https://sqs.us-east-1.amazonaws.com/123456789012/my-queue
use_task_role: true
```

### Source databases (RDS)

Postgres source databases hosted on RDS can use [IAM database authentication](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/UsingWithRDS.IAMDBAuth.html) instead of a static password. Enable `use_iam_auth` on the database configuration and provide the `iam_region`. Sequin will generate short-lived auth tokens automatically.

### Sequin's internal database

Sequin's own internal Postgres database (used for configuration and state) can also use RDS IAM auth. Set these environment variables:

| Variable | Description |
|----------|-------------|
| `PG_IAM_AUTH=true` | Enable IAM auth for Sequin's internal database |
| `PG_IAM_REGION` | AWS region of the RDS instance (required when `PG_IAM_AUTH` is enabled) |

When enabled, `PG_PASSWORD` is no longer required. SSL is enforced automatically.

## Why Sequin

We all know Postgres is great for storing and querying data. But what about when you need to stream changes to other systems?
Expand Down
6 changes: 4 additions & 2 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ sequin_stream_schema = "sequin_streams"

config :esbuild, :version, "0.17.11"

# Used by broadway_sqs
config :ex_aws, http_client: ExAws.Request.Req
# Used by broadway_sqs and IRSA credential provider
config :ex_aws,
http_client: ExAws.Request.Req,
json_codec: Jason

config :logger, :console,
format: {Sequin.ConsoleLogger, :format},
Expand Down
57 changes: 45 additions & 12 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ if config_env() == :prod and self_hosted do

backfill_max_pending_messages = ConfigParser.backfill_max_pending_messages(env_vars)

pg_iam_auth? = System.get_env("PG_IAM_AUTH") in ~w(true 1)
pg_iam_region = System.get_env("PG_IAM_REGION")

if pg_iam_auth? and is_nil(pg_iam_region) do
raise "PG_IAM_REGION is required when PG_IAM_AUTH is enabled"
end

database_url =
case System.get_env("PG_URL") do
nil ->
Expand All @@ -120,14 +127,30 @@ if config_env() == :prod and self_hosted do
username = System.get_env("PG_USERNAME")
password = System.get_env("PG_PASSWORD")

if Enum.all?([hostname, database, port, username, password], &(not is_nil(&1))) do
"postgres://#{username}:#{password}@#{hostname}:#{port}/#{database}"
# When using IAM auth, password is not required — a token is generated at connection time
required_vars = [hostname, database, port, username]

if pg_iam_auth? do
if Enum.all?(required_vars, &(not is_nil(&1))) do
# Use a placeholder password in the URL; it will be replaced by the IAM token
"postgres://#{username}:iam-placeholder@#{hostname}:#{port}/#{database}"
else
raise """
Missing PostgreSQL connection information.
When PG_IAM_AUTH is enabled, provide either PG_URL or all of the following:
PG_HOSTNAME, PG_DATABASE, PG_PORT, PG_USERNAME
"""
end
else
raise """
Missing PostgreSQL connection information.
Please provide either PG_URL or all of the following environment variables:
PG_HOSTNAME, PG_DATABASE, PG_PORT, PG_USERNAME, PG_PASSWORD
"""
if Enum.all?([password | required_vars], &(not is_nil(&1))) do
"postgres://#{username}:#{password}@#{hostname}:#{port}/#{database}"
else
raise """
Missing PostgreSQL connection information.
Please provide either PG_URL or all of the following environment variables:
PG_HOSTNAME, PG_DATABASE, PG_PORT, PG_USERNAME, PG_PASSWORD
"""
end
end

url ->
Expand Down Expand Up @@ -163,17 +186,27 @@ if config_env() == :prod and self_hosted do
:ok
end

repo_opts = [
ssl: if(pg_iam_auth?, do: [verify: :verify_none], else: repo_ssl),
pool_size: String.to_integer(System.get_env("PG_POOL_SIZE", "10")),
url: database_url,
socket_options: ConfigParser.ecto_socket_opts(env_vars)
]

repo_opts =
if pg_iam_auth? do
Keyword.put(repo_opts, :configure, {Sequin.Aws.RepoIamAuth, :configure, [pg_iam_region]})
else
repo_opts
end

config :sequin, Sequin.Posthog,
req_opts: [base_url: "https://us.i.posthog.com"],
api_key: "phc_i9k28nZwjjJG9DzUK0gDGASxXtGNusdI1zdaz9cuA7h",
frontend_api_key: "phc_i9k28nZwjjJG9DzUK0gDGASxXtGNusdI1zdaz9cuA7h",
is_disabled: System.get_env("SEQUIN_TELEMETRY_DISABLED") in ~w(true 1)

config :sequin, Sequin.Repo,
ssl: repo_ssl,
pool_size: String.to_integer(System.get_env("PG_POOL_SIZE", "10")),
url: database_url,
socket_options: ConfigParser.ecto_socket_opts(env_vars)
config :sequin, Sequin.Repo, repo_opts

config :sequin, SequinWeb.Endpoint,
# `url` is used for configuring links in the console. So it corresponds to the *external*
Expand Down
1 change: 1 addition & 0 deletions lib/sequin/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ defmodule Sequin.Application do
Sequin.Redis.connect_cluster()

[
Sequin.Aws.IrsaCredentials,
Sequin.Repo,
Sequin.Vault,
Sequin.PubSub.child_spec(),
Expand Down
10 changes: 10 additions & 0 deletions lib/sequin/aws/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule Sequin.Aws.Client do

@behaviour Sequin.Aws

alias Sequin.Aws.IrsaCredentials
alias Sequin.Error

require Logger
Expand All @@ -34,6 +35,15 @@ defmodule Sequin.Aws.Client do
end

defp get_credentials do
# IRSA (Kubernetes/EKS) takes priority if available
if IrsaCredentials.available?() do
IrsaCredentials.get_credentials()
else
get_credentials_from_metadata()
end
end

defp get_credentials_from_metadata do
case Application.ensure_all_started(:aws_credentials) do
{:ok, _} ->
case :aws_credentials.get_credentials() do
Expand Down
221 changes: 221 additions & 0 deletions lib/sequin/aws/irsa_credentials.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
defmodule Sequin.Aws.IrsaCredentials do
@moduledoc """
GenServer that manages IRSA (IAM Roles for Service Accounts) credentials for EKS.

Reads the projected JWT from `AWS_WEB_IDENTITY_TOKEN_FILE`, calls
`STS:AssumeRoleWithWebIdentity` via `ex_aws_sts`, and refreshes the resulting
temporary credentials before they expire.

The GenServer only starts when both `AWS_ROLE_ARN` and `AWS_WEB_IDENTITY_TOKEN_FILE`
environment variables are set. Otherwise it returns `:ignore` from `init/1`.
"""

use GenServer

alias Sequin.Error

require Logger

@default_session_name "sequin-irsa"
# Refresh at 80% of credential TTL
@refresh_ratio 0.8
# Minimum refresh interval: 60 seconds
@min_refresh_ms to_timeout(minute: 1)
# Retry interval on failure: 30 seconds
@retry_ms to_timeout(second: 30)

defmodule State do
@moduledoc false
use TypedStruct

typedstruct do
field :role_arn, String.t(), enforce: true
field :token_file, String.t(), enforce: true
field :session_name, String.t(), enforce: true
field :credentials, map()
field :expires_at, DateTime.t()
field :refresh_timer, reference()
end
end

# -- Public API --

def start_link(opts \\ []) do
name = Keyword.get(opts, :name, __MODULE__)
GenServer.start_link(__MODULE__, opts, name: name)
end

@doc """
Returns true if the IRSA GenServer is running and has credentials available.
"""
@spec available?() :: boolean()
def available? do
case GenServer.whereis(__MODULE__) do
nil -> false
pid -> Process.alive?(pid) and GenServer.call(pid, :available?)
end
catch
:exit, _ -> false
end

@doc """
Returns the current IRSA credentials.

Returns `{:ok, %{access_key_id: ..., secret_access_key: ..., token: ...}}`
or `{:error, reason}`.
"""
@spec get_credentials() :: {:ok, map()} | {:error, Error.t()}
def get_credentials do
GenServer.call(__MODULE__, :get_credentials)
catch
:exit, _ ->
{:error, Error.service(service: :aws, message: "IRSA credentials server not available")}
end

# -- GenServer callbacks --

@impl GenServer
def init(opts) do
role_arn = Keyword.get(opts, :role_arn) || System.get_env("AWS_ROLE_ARN")
token_file = Keyword.get(opts, :token_file) || System.get_env("AWS_WEB_IDENTITY_TOKEN_FILE")
session_name = Keyword.get(opts, :session_name) || System.get_env("AWS_ROLE_SESSION_NAME", @default_session_name)

if is_nil(role_arn) or is_nil(token_file) do
:ignore
else
state = %State{
role_arn: role_arn,
token_file: token_file,
session_name: session_name
}

# Fetch credentials immediately on startup
case fetch_credentials(state) do
{:ok, new_state} ->
{:ok, new_state}

{:error, reason} ->
Logger.error("IRSA initial credential fetch failed: #{inspect(reason)}. Will retry.")
timer = Process.send_after(self(), :refresh, @retry_ms)
{:ok, %{state | refresh_timer: timer}}
end
end
end

@impl GenServer
def handle_call(:available?, _from, %State{credentials: credentials} = state) do
{:reply, not is_nil(credentials), state}
end

@impl GenServer
def handle_call(:get_credentials, _from, %State{credentials: nil} = state) do
{:reply, {:error, Error.service(service: :aws, message: "IRSA credentials not yet available")}, state}
end

@impl GenServer
def handle_call(:get_credentials, _from, %State{credentials: credentials} = state) do
{:reply, {:ok, credentials}, state}
end

@impl GenServer
def handle_info(:refresh, state) do
case fetch_credentials(state) do
{:ok, new_state} ->
{:noreply, new_state}

{:error, reason} ->
Logger.error("IRSA credential refresh failed: #{inspect(reason)}. Will retry in #{@retry_ms}ms.")
timer = Process.send_after(self(), :refresh, @retry_ms)
{:noreply, %{state | refresh_timer: timer}}
end
end

# -- Private --

defp fetch_credentials(%State{} = state) do
with {:ok, token} <- read_token_file(state.token_file),
{:ok, result} <- assume_role_with_web_identity(state.role_arn, token, state.session_name) do
credentials = %{
access_key_id: result.access_key_id,
secret_access_key: result.secret_access_key,
token: result.session_token
}

expires_at = result.expiration
refresh_ms = calculate_refresh_ms(expires_at)

if state.refresh_timer, do: Process.cancel_timer(state.refresh_timer)
timer = Process.send_after(self(), :refresh, refresh_ms)

Logger.info("IRSA credentials fetched successfully. Next refresh in #{div(refresh_ms, 1000)}s.")

{:ok, %{state | credentials: credentials, expires_at: expires_at, refresh_timer: timer}}
end
end

defp read_token_file(path) do
case File.read(path) do
{:ok, token} ->
{:ok, String.trim(token)}

{:error, reason} ->
{:error, Error.service(service: :aws, message: "Failed to read IRSA token file #{path}: #{inspect(reason)}")}
end
end

defp assume_role_with_web_identity(role_arn, token, session_name) do
op =
ExAws.STS.assume_role_with_web_identity(
role_arn,
session_name,
token,
duration: 3600
)

# AssumeRoleWithWebIdentity doesn't require pre-existing AWS credentials —
# the web identity token IS the authentication. We pass dummy credentials
# to satisfy ExAws's credential resolution (same approach as the built-in
# AssumeRoleWebIdentityAdapter).
config = %{access_key_id: "not-used", secret_access_key: "not-used"}

case ExAws.request(op, config) do
{:ok, %{body: body}} ->
parse_assume_role_response(body)

{:error, {:http_error, status, %{body: body}}} ->
{:error, Error.service(service: :aws, message: "STS AssumeRoleWithWebIdentity failed (HTTP #{status}): #{body}")}

{:error, reason} ->
{:error, Error.service(service: :aws, message: "STS AssumeRoleWithWebIdentity failed: #{inspect(reason)}")}
end
end

defp parse_assume_role_response(body) do
credentials = body[:credentials]

if credentials do
expiration =
case DateTime.from_iso8601(credentials[:expiration]) do
{:ok, dt, _offset} -> dt
_ -> DateTime.add(DateTime.utc_now(), 3600, :second)
end

{:ok,
%{
access_key_id: credentials[:access_key_id],
secret_access_key: credentials[:secret_access_key],
session_token: credentials[:session_token],
expiration: expiration
}}
else
{:error, Error.service(service: :aws, message: "Unexpected STS response format: #{inspect(body)}")}
end
end

defp calculate_refresh_ms(expires_at) do
now = DateTime.utc_now()
ttl_seconds = DateTime.diff(expires_at, now, :second)
refresh_seconds = trunc(ttl_seconds * @refresh_ratio)
max(refresh_seconds * 1000, @min_refresh_ms)
end
end
Loading
Loading