Skip to content

Commit 4ffd44f

Browse files
committed
🗜️ Add Kafka compression options
Verification e2e for all compression algorithms: `docker exec sequin-dev-kafka-1 kafka-dump-log --files /var/lib/kafka/data/records-0/00000000000000000000.log --print-data-log 2>&1 | grep "^baseOffset"` ┌─────────┬─────────────┬──────────────┐ │ Offsets │ Compression │ Timestamp │ ├─────────┼─────────────┼──────────────┤ │ 0–5 │ lz4 │ Feb 17 22:34 │ ├─────────┼─────────────┼──────────────┤ │ 6–11 │ lz4 │ Feb 17 23:07 │ ├─────────┼─────────────┼──────────────┤ │ 12–17 │ gzip │ Feb 18 18:29 │ ├─────────┼─────────────┼──────────────┤ │ 18–23 │ snappy │ Feb 18 18:32 │ ├─────────┼─────────────┼──────────────┤ │ 24–29 │ zstd │ Feb 18 18:32 │ └─────────┴─────────────┴──────────────┘
1 parent 713cc02 commit 4ffd44f

7 files changed

Lines changed: 23 additions & 10 deletions

File tree

assets/svelte/consumers/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ export type KafkaConsumer = BaseConsumer & {
176176
tls: boolean;
177177
topic: string;
178178
sasl_mechanism: null | "plain" | "scram_sha_256" | "scram_sha_512";
179-
compression: "none" | "lz4";
179+
compression: "none" | "gzip" | "snappy" | "lz4" | "zstd";
180180
};
181181
};
182182

assets/svelte/sinks/kafka/KafkaSinkCard.svelte

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,16 @@
6060
<span
6161
class="font-mono bg-slate-50 py-1 px-2 border border-slate-100 rounded-md whitespace-nowrap"
6262
>
63-
{#if consumer.sink.compression === "lz4"}
64-
LZ4
65-
{:else if consumer.sink.compression === "none"}
63+
{#if consumer.sink.compression === "none"}
6664
None
65+
{:else if consumer.sink.compression === "gzip"}
66+
Gzip
67+
{:else if consumer.sink.compression === "snappy"}
68+
Snappy
69+
{:else if consumer.sink.compression === "lz4"}
70+
LZ4
71+
{:else if consumer.sink.compression === "zstd"}
72+
Zstd
6773
{:else}
6874
{consumer.sink.compression}
6975
{/if}

assets/svelte/sinks/kafka/KafkaSinkForm.svelte

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,10 @@
245245
class="block w-full border border-gray-300 rounded-md p-2"
246246
>
247247
<option value="none">None</option>
248+
<option value="gzip">Gzip</option>
249+
<option value="snappy">Snappy</option>
248250
<option value="lz4">LZ4</option>
251+
<option value="zstd">Zstd</option>
249252
</select>
250253
<p class="text-sm text-muted-foreground">
251254
Compression algorithm for Kafka producer messages.

docs/reference/sequin-yaml.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ destination:
363363
username: "kafka-user" # Optional, for SASL authentication
364364
password: "kafka-pass" # Optional, for SASL authentication
365365
sasl_mechanism: "plain" # Optional: plain, scram_sha_256, scram_sha_512
366-
compression: "lz4" # Optional: none (default), lz4
366+
compression: "lz4" # Optional: none (default), gzip, snappy, lz4, zstd
367367
batch_size: 50 # Optional, messages per batch
368368
```
369369

lib/sequin/consumers/kafka_sink.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ defmodule Sequin.Consumers.KafkaSink do
2525
field :aws_secret_access_key, EncryptedField
2626
field :connection_id, :string
2727
field :routing_mode, Ecto.Enum, values: [:dynamic, :static]
28-
field :compression, Ecto.Enum, values: [:none, :lz4], default: :none
28+
field :compression, Ecto.Enum, values: [:none, :gzip, :snappy, :lz4, :zstd], default: :none
2929
end
3030

3131
def changeset(struct, params) do

mix.exs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,10 @@ defmodule Sequin.MixProject do
101101
{:gnat, "~> 1.9"},
102102
{:amqp, "~> 4.1"},
103103
{:amqp_client, "~> 4.2"},
104-
{:brod, "~> 4.3"},
104+
{:brod, "~> 4.5"},
105105
{:lz4b, "~> 0.0.13"},
106+
{:snappyer, "~> 1.2"},
107+
{:ezstd, "~> 1.1"},
106108

107109
# Caching and State Management
108110
{:con_cache, "~> 1.1"},

mix.lock

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"broadway": {:hex, :broadway, "1.2.1", "83a1567423c26885e15f6cd8670ca790370af2fcff2ede7fa88c5ea793087a67", [:mix], [{:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.3.7 or ~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "68ae63d83b55bdca0f95cd49feee5fb74c5a6bec557caf940860fe07dbc8a4fb"},
1515
"broadway_dashboard": {:hex, :broadway_dashboard, "0.4.1", "a5f4cba542390ba1cb6f0d26401676312adc8330f664e7aec98b498057d757a2", [:mix], [{:broadway, "~> 1.0", [hex: :broadway, repo: "hexpm", optional: false]}, {:phoenix_live_dashboard, "~> 0.8.0", [hex: :phoenix_live_dashboard, repo: "hexpm", optional: false]}], "hexpm", "2177b4d4ab46bdc059613fa447219c974091cfc79d9ff16480a9f4aabab89020"},
1616
"broadway_sqs": {:git, "https://github.com/dashbitco/broadway_sqs.git", "94ccc7e079eb0bb44f20752f233bad1480e00c34", [ref: "94ccc7e"]},
17-
"brod": {:hex, :brod, "4.3.2", "51f4dff17ed43a806558ebd62cc88e7b35aed336d1ba1f3de2d010f463d49736", [:rebar3], [{:kafka_protocol, "4.1.10", [hex: :kafka_protocol, repo: "hexpm", optional: false]}], "hexpm", "88584fdeba746aa6729e2a1826416c10899954f68af93659b3c2f38a2dcaa27c"},
17+
"brod": {:hex, :brod, "4.5.2", "66033736c7808c01e544ad0632ce2f5cf6b0e02dbe2e9b698a0b468a417cac23", [:cmake, :rebar3], [{:kafka_protocol, "4.3.2", [hex: :kafka_protocol, repo: "hexpm", optional: false]}], "hexpm", "90f4257195de79ae782b33044f45602162e72c790c66c78393053a03b3616cb5"},
1818
"castore": {:hex, :castore, "1.0.14", "4582dd7d630b48cf5e1ca8d3d42494db51e406b7ba704e81fbd401866366896a", [:mix], [], "hexpm", "7bc1b65249d31701393edaaac18ec8398d8974d52c647b7904d01b964137b9f4"},
1919
"certifi": {:hex, :certifi, "2.15.0", "0e6e882fcdaaa0a5a9f2b3db55b1394dba07e8d6d9bcad08318fb604c6839712", [:rebar3], [], "hexpm", "b147ed22ce71d72eafdad94f055165c1c182f61a2ff49df28bcc71d1d5b94a60"},
2020
"chacha20": {:hex, :chacha20, "1.0.4", "0359d8f9a32269271044c1b471d5cf69660c362a7c61a98f73a05ef0b5d9eb9e", [:mix], [], "hexpm", "2027f5d321ae9903f1f0da7f51b0635ad6b8819bc7fe397837930a2011bc2349"},
@@ -26,7 +26,7 @@
2626
"con_cache": {:hex, :con_cache, "1.1.0", "45c7c6cd6dc216e47636232e8c683734b7fe293221fccd9454fa1757bc685044", [:mix], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "8655f2ae13a1e56c8aef304d250814c7ed929c12810f126fc423ecc8e871593b"},
2727
"connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"},
2828
"cowlib": {:hex, :cowlib, "2.13.0", "db8f7505d8332d98ef50a3ef34b34c1afddec7506e4ee4dd4a3a266285d282ca", [:make, :rebar3], [], "hexpm", "e1e1284dc3fc030a64b1ad0d8382ae7e99da46c3246b815318a4b848873800a4"},
29-
"crc32cer": {:hex, :crc32cer, "0.1.11", "b550da6d615feb72a882d15d020f8f7dee72dfb2cb1bcdf3b1ee8dc2afd68cfc", [:rebar3], [], "hexpm", "a39b8f0b1990ac1bf06c3a247fc6a178b740cdfc33c3b53688dc7dd6b1855942"},
29+
"crc32cer": {:hex, :crc32cer, "1.1.2", "e2232fe8d4c1ef99f564633ab8612d352533c2b94c780965742a2f1eeeba6a0f", [:rebar3], [], "hexpm", "e2c9c69703385ec92ea7e00d4f8b1870549e6f13c3888d18f8f483a951147fbb"},
3030
"credentials_obfuscation": {:hex, :credentials_obfuscation, "3.5.0", "61e282adfb4439486b3994faaec69543c7ee6cc7e70c6340e8853fd9deaf8219", [:rebar3], [], "hexpm", "843adbe3246861ce0f1a0fa3222f384834eb31defd8d6b9cba7afd2977c957bc"},
3131
"curve25519": {:hex, :curve25519, "1.0.5", "f801179424e4012049fcfcfcda74ac04f65d0ffceeb80e7ef1d3352deb09f5bb", [:mix], [], "hexpm", "0fba3ad55bf1154d4d5fc3ae5fb91b912b77b13f0def6ccb3a5d58168ff4192d"},
3232
"db_connection": {:hex, :db_connection, "2.7.0", "b99faa9291bb09892c7da373bb82cba59aefa9b36300f6145c5f201c7adf48ec", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "dcf08f31b2701f857dfc787fbad78223d61a32204f217f15e881dd93e4bdd3ff"},
@@ -48,6 +48,7 @@
4848
"ex_aws": {:hex, :ex_aws, "2.5.9", "8e2455172f0e5cbe2f56dd68de514f0dae6bb26d6b6e2f435a06434cf9dbb412", [:mix], [{:configparser_ex, "~> 4.0", [hex: :configparser_ex, repo: "hexpm", optional: true]}, {:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:jsx, "~> 2.8 or ~> 3.0", [hex: :jsx, repo: "hexpm", optional: true]}, {:mime, "~> 1.2 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:req, "~> 0.5.10 or ~> 0.6 or ~> 1.0", [hex: :req, repo: "hexpm", optional: true]}, {:sweet_xml, "~> 0.7", [hex: :sweet_xml, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "cbdb6ffb0e6c6368de05ed8641fe1376298ba23354674428e5b153a541f23359"},
4949
"ex_aws_sqs": {:hex, :ex_aws_sqs, "3.4.0", "f7c4d0177c1c954776363d3dc05e5dfd37ddf0e2c65ec3f047e5c9c7dd1b71ac", [:mix], [{:ex_aws, "~> 2.1", [hex: :ex_aws, repo: "hexpm", optional: false]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:saxy, "~> 1.1", [hex: :saxy, repo: "hexpm", optional: true]}, {:sweet_xml, ">= 0.0.0", [hex: :sweet_xml, repo: "hexpm", optional: true]}], "hexpm", "b504482206ccaf767b714888e9d41a1cfcdcb241577985517114191c812f155a"},
5050
"expo": {:hex, :expo, "0.5.2", "beba786aab8e3c5431813d7a44b828e7b922bfa431d6bfbada0904535342efe2", [:mix], [], "hexpm", "8c9bfa06ca017c9cb4020fabe980bc7fdb1aaec059fd004c2ab3bff03b1c599c"},
51+
"ezstd": {:hex, :ezstd, "1.2.3", "98748f4099e6e2a067f77ace43041ebaa53c13194b08ce22370e4c93079e9e16", [:rebar3], [], "hexpm", "de32e0b41ba36a9ed46db8215da74777d2f141bb75f67bfc05dbb4b7c3386dee"},
5152
"faker": {:hex, :faker, "0.18.0", "943e479319a22ea4e8e39e8e076b81c02827d9302f3d32726c5bf82f430e6e14", [:mix], [], "hexpm", "bfbdd83958d78e2788e99ec9317c4816e651ad05e24cfd1196ce5db5b3e81797"},
5253
"file_system": {:hex, :file_system, "1.1.1", "31864f4685b0148f25bd3fbef2b1228457c0c89024ad67f7a81a3ffbc0bbad3a", [:mix], [], "hexpm", "7a15ff97dfe526aeefb090a7a9d3d03aa907e100e262a0f8f7746b78f8f87a5d"},
5354
"finch": {:hex, :finch, "0.20.0", "5330aefb6b010f424dcbbc4615d914e9e3deae40095e73ab0c1bb0968933cadf", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2658131a74d051aabfcba936093c903b8e89da9a1b63e430bee62045fa9b2ee2"},
@@ -65,7 +66,7 @@
6566
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
6667
"jose": {:hex, :jose, "1.11.10", "a903f5227417bd2a08c8a00a0cbcc458118be84480955e8d251297a425723f83", [:mix, :rebar3], [], "hexpm", "0d6cd36ff8ba174db29148fc112b5842186b68a90ce9fc2b3ec3afe76593e614"},
6768
"jsx": {:hex, :jsx, "3.1.0", "d12516baa0bb23a59bb35dccaf02a1bd08243fcbb9efe24f2d9d056ccff71268", [:rebar3], [], "hexpm", "0c5cc8fdc11b53cc25cf65ac6705ad39e54ecc56d1c22e4adb8f5a53fb9427f3"},
68-
"kafka_protocol": {:hex, :kafka_protocol, "4.1.10", "f917b6c90c8df0de2b40a87d6b9ae1cfce7788e91a65818e90e40cf76111097a", [:rebar3], [{:crc32cer, "0.1.11", [hex: :crc32cer, repo: "hexpm", optional: false]}], "hexpm", "df680a3706ead8695f8b306897c0a33e8063c690da9308db87b462cfd7029d04"},
69+
"kafka_protocol": {:hex, :kafka_protocol, "4.3.2", "a814a6f8c373ed86e87fd9fdc891787711c6db4f4c9f000aa4c7bc47641cc2ed", [:rebar3], [{:crc32cer, "1.1.2", [hex: :crc32cer, repo: "hexpm", optional: false]}], "hexpm", "213fbba9442404b4d2e6233ba5090e527fd92fbf43831139021207ba05e9642f"},
6970
"kcl": {:hex, :kcl, "1.4.2", "8b73a55a14899dc172fcb05a13a754ac171c8165c14f65043382d567922f44ab", [:mix], [{:curve25519, ">= 1.0.4", [hex: :curve25519, repo: "hexpm", optional: false]}, {:ed25519, "~> 1.3", [hex: :ed25519, repo: "hexpm", optional: false]}, {:poly1305, "~> 1.0", [hex: :poly1305, repo: "hexpm", optional: false]}, {:salsa20, "~> 1.0", [hex: :salsa20, repo: "hexpm", optional: false]}], "hexpm", "9f083dd3844d902df6834b258564a82b21a15eb9f6acdc98e8df0c10feeabf05"},
7071
"libcluster": {:hex, :libcluster, "3.4.1", "271d2da892763bbef53c2872036c936fe8b80111eb1feefb2d30a3bb15c9b4f6", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.3", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "1d568157f069c6afa70ec0d736704cf799734bdbb6343f0322af4a980301c853"},
7172
"live_svelte": {:hex, :live_svelte, "0.13.3", "54e7c55d30b5b143674011d32c85bfbba9f38f7684f6dbc58a39b082606874da", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: false]}, {:nodejs, "~> 2.0", [hex: :nodejs, repo: "hexpm", optional: false]}, {:phoenix, ">= 1.7.0", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, ">= 3.3.1", [hex: :phoenix_html, repo: "hexpm", optional: false]}, {:phoenix_live_view, ">= 0.18.0", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}], "hexpm", "57dbc0e136d0db20bc0bd8282bbb4d4be3972359c47a787c144b09269f23851b"},
@@ -116,6 +117,7 @@
116117
"salsa20": {:hex, :salsa20, "1.0.4", "404cbea1fa8e68a41bcc834c0a2571ac175580fec01cc38cc70c0fb9ffc87e9b", [:mix], [], "hexpm", "745ddcd8cfa563ddb0fd61e7ce48d5146279a2cf7834e1da8441b369fdc58ac6"},
117118
"saxy": {:hex, :saxy, "1.6.0", "02cb4e9bd045f25ac0c70fae8164754878327ee393c338a090288210b02317ee", [:mix], [], "hexpm", "ef42eb4ac983ca77d650fbdb68368b26570f6cc5895f0faa04d34a6f384abad3"},
118119
"sentry": {:hex, :sentry, "10.8.1", "aa45309785e1521416225adb16e0b4d8b957578804527f3c7babb6fefbc5e456", [:mix], [{:hackney, "~> 1.8", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_ownership, "~> 0.3.0 or ~> 1.0", [hex: :nimble_ownership, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.6", [hex: :phoenix, repo: "hexpm", optional: true]}, {:phoenix_live_view, "~> 0.20 or ~> 1.0", [hex: :phoenix_live_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.6", [hex: :plug, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "495b3cdadad90ba72eef973aa3dec39b3b8b2a362fe87e2f4ef32133ac3b4097"},
120+
"snappyer": {:hex, :snappyer, "1.2.10", "023e9ae00e969b0997208b5de7d3b12bb46ec6bc5411e8dc53e7b3f435b8f0fd", [:rebar3], [], "hexpm", "f55bd9ed147e7163cb3acd1e431a7ff2c9e31ceacbb8308786094fb64551c284"},
119121
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"},
120122
"statistex": {:hex, :statistex, "1.0.0", "f3dc93f3c0c6c92e5f291704cf62b99b553253d7969e9a5fa713e5481cd858a5", [:mix], [], "hexpm", "ff9d8bee7035028ab4742ff52fc80a2aa35cece833cf5319009b52f1b5a86c27"},
121123
"stillir": {:hex, :stillir, "1.0.0", "9e77eaadd2418a61ec7398c01e29dea26d14f51c42e0b309084493e3ed33337a", [:rebar3], [], "hexpm", "04afdee2e5123b6da11fcc28c38d581f74db0cbe1faa1c36ed4f364797b588c0"},

0 commit comments

Comments
 (0)