Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 32 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,22 @@ Example: consume credits only once per account.
```ruby
account_scope = event_store.read.with_tag("account:x")

begin
event_store.append(
[
En57::Event.new(
type: "CreditsUsed",
data: { amount: 100 },
tags: ["account:x"],
),
],
fail_if: account_scope.of_type("CreditsUsed"),
)
rescue En57::AppendConditionViolated
# lost the race; another writer already consumed credits
result = event_store.append(
[
En57::Event.new(
type: "CreditsUsed",
data: { amount: 100 },
tags: ["account:x"],
),
],
fail_if: account_scope.of_type("CreditsUsed"),
)

case result
in En57::Success(position:)
# credits consumed at event position
in En57::Failure(position:)
# lost the race; another writer already consumed credits at position
end
```

Expand All @@ -144,18 +147,21 @@ Example: ensure no event exists with this email tag before writing.
```ruby
email_tag = "email:alice@example.com"

begin
event_store.append(
[
En57::Event.new(
type: "UserRegistered",
data: { name: "Alice" },
tags: [email_tag],
),
],
fail_if: event_store.read.with_tag(email_tag),
)
rescue En57::AppendConditionViolated
# email already used
result = event_store.append(
[
En57::Event.new(
type: "UserRegistered",
data: { name: "Alice" },
tags: [email_tag],
),
],
fail_if: event_store.read.with_tag(email_tag),
)

case result
in En57::Success(position:)
# user registered at event position
in En57::Failure(position:)
# email already used at event position
end
```
105 changes: 63 additions & 42 deletions db/schema/0.1.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,13 @@ CREATE TYPE en57.event AS (
tags text[]
);

CREATE TYPE en57.append_result AS (
status text,
"position" bigint
);

CREATE FUNCTION en57.append_events (new_events en57.event[], append_condition jsonb DEFAULT '{}'::jsonb)
RETURNS void
RETURNS en57.append_result
LANGUAGE plpgsql
SET enable_seqscan = OFF
AS $$
Expand All @@ -39,6 +44,8 @@ DECLARE
req_types text[];
req_tags text[];
req_after bigint;
appended_position bigint;
matched_position bigint;
BEGIN
FOREACH criterion IN ARRAY criteria LOOP
req_after := (criterion ->> 'after')::bigint;
Expand All @@ -48,56 +55,70 @@ BEGIN
req_tags := ARRAY ( SELECT DISTINCT
jsonb_array_elements_text(COALESCE(criterion -> 'tags', '[]'::jsonb)));
IF cardinality(req_tags) > 0 THEN
IF EXISTS (
SELECT
max(e.position)
FROM (
SELECT
1
FROM (
SELECT
t.event_id
FROM
en57.tags AS t
WHERE
t.value = ANY (req_tags)
GROUP BY
t.event_id
HAVING
count(*) = cardinality(req_tags)) AS matched
JOIN en57.events AS e ON e.id = matched.event_id
WHERE (req_after IS NULL
OR e.position > req_after)
t.event_id
FROM
en57.tags AS t
WHERE
t.value = ANY (req_tags)
GROUP BY
t.event_id
HAVING
count(*) = cardinality(req_tags)) AS matched
JOIN en57.events AS e ON e.id = matched.event_id
WHERE (req_after IS NULL
OR e.position > req_after)
AND (criterion -> 'types' IS NULL
OR e.type = ANY (req_types))) THEN
RAISE EXCEPTION 'append_condition_violated';
END IF;
ELSE
IF EXISTS (
OR e.type = ANY (req_types))
INTO
matched_position;
ELSE
SELECT
1
max(e.position)
FROM
en57.events AS e
WHERE (req_after IS NULL
OR e.position > req_after)
AND (criterion -> 'types' IS NULL
OR e.type = ANY (req_types))) THEN
RAISE EXCEPTION 'append_condition_violated';
END IF;
END IF;
END LOOP;
AND (criterion -> 'types' IS NULL
OR e.type = ANY (req_types))
INTO
matched_position;
END IF;
IF matched_position IS NOT NULL THEN
RETURN ROW ('append_condition_violated',
matched_position)::en57.append_result;
END IF;
END LOOP;
WITH inserted_events AS (
INSERT INTO en57.events (id, type, data, meta)
SELECT
e.id,
e.type,
e.data,
e.meta
FROM
unnest(new_events) AS e;
INSERT INTO en57.tags (event_id, value)
SELECT
e.id,
t.value
FROM
unnest(new_events) AS e
SELECT
e.id,
e.type,
e.data,
e.meta
FROM
unnest(new_events) AS e
RETURNING
"position"
)
SELECT
max("position")
FROM
inserted_events
INTO
appended_position;
INSERT INTO en57.tags (event_id, value)
SELECT
e.id,
t.value
FROM
unnest(new_events) AS e
CROSS JOIN LATERAL unnest(COALESCE(e.tags, ARRAY[]::text[])) AS t (value);
RETURN ROW ('success',
appended_position)::en57.append_result;
END;
$$;

Expand Down
13 changes: 8 additions & 5 deletions lib/benchmark/append_non_conflicting_tags.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ def call(measure, retries, run_id)
.with_tag(tags = ["writer:#{run_id}"])
events = @batch_size.times.map { Event.new(type:, tags:) }

position = 0
measure.call do
begin
@event_store.append(events, fail_if: scope.after(position = 0))
rescue AppendConditionViolated
retries.call
retry
loop do
case @event_store.append(events, fail_if: scope.after(position))
in Success
break
in Failure[position:]
retries.call
end
end
end
end
Expand Down
14 changes: 7 additions & 7 deletions lib/benchmark/concurrent_append_conflicting_tags.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ def call(measure, retries, run_id)
events = @batch_size.times.map { Event.new(type:, tags:) }
barrier.wait

position = 0
measure.call do
begin
@event_store.append(events, fail_if: scope.after(position = 0))
rescue AppendConditionViolated
retries.call
scope.each_with_position do |_event, event_position|
position = event_position
loop do
case @event_store.append(events, fail_if: scope.after(position))
in Success
break
in Failure[position:]
retries.call
end
retry
end
end
end
Expand Down
8 changes: 2 additions & 6 deletions lib/benchmark/concurrent_append_non_conflicting_tags.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,9 @@ def call(measure, retries, _run_id)
events = @batch_size.times.map { Event.new(type:, tags:) }
barrier.wait

position = 0
measure.call do
begin
@event_store.append(events, fail_if: scope.after(position = 0))
rescue AppendConditionViolated
retries.call
retry
end
@event_store.append(events, fail_if: scope.after(position))
end
end
end
Expand Down
4 changes: 3 additions & 1 deletion lib/en57.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
require_relative "en57/configuration"

module En57
AppendConditionViolated = Class.new(StandardError)
Success = Data.define(:position)
Failure = Data.define(:position)
AppendRetriesExhausted = Class.new(StandardError)

def self.configuration = Configuration.instance

Expand Down
2 changes: 2 additions & 0 deletions lib/en57/active_record_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ def with_transaction(&block) = run_transaction({}, &block)
def with_serializable_transaction(&block) =
run_transaction({ isolation: :serializable }, &block)

def serialization_error = ActiveRecord::SerializationFailure

private

def run_transaction(options)
Expand Down
5 changes: 5 additions & 0 deletions lib/en57/benchmark.rb
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ def initialize(scenarios:, formatter:)
end

def run
original_append_retries = En57.configuration.append_retries
En57.configuration.append_retries = 100

results =
@scenarios.map do |instance_name, mk_scenario|
PgEphemeral.with_server(instance_name:) do |server|
Expand Down Expand Up @@ -283,6 +286,8 @@ def run
end

@formatter.format(results)
ensure
En57.configuration.append_retries = original_append_retries
end
end

Expand Down
3 changes: 2 additions & 1 deletion lib/en57/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ module En57
class Configuration
include Singleton

attr_accessor :serializer
attr_accessor :append_retries, :serializer

def initialize
@append_retries = 9
@serializer = JsonSerializer.new
end
end
Expand Down
1 change: 0 additions & 1 deletion lib/en57/event_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ def initialize(repository)

def append(events, fail_if: EmptyScope.new)
@repository.append(events, fail_if: fail_if.to_query)
self
end

def read
Expand Down
16 changes: 10 additions & 6 deletions lib/en57/pg_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@ def with_transaction(&block) = run_transaction("BEGIN", &block)
def with_serializable_transaction(&block) =
run_transaction("BEGIN ISOLATION LEVEL SERIALIZABLE", &block)

def serialization_error = PG::TRSerializationFailure

private

def run_transaction(begin_statement)
with_connection do |connection|
connection.exec(begin_statement)
begin
yield connection
rescue StandardError
connection.exec("ROLLBACK")
raise
end
result =
begin
yield connection
rescue StandardError
connection.exec("ROLLBACK")
raise
end
connection.exec("COMMIT")
result
end
end

Expand Down
Loading