Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions lib/posthog/sender.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule PostHog.Sender do
:api_client,
:max_batch_time_ms,
:max_batch_events,
:timer_ref,
events: [],
num_events: 0
]
Expand Down Expand Up @@ -74,24 +75,28 @@ defmodule PostHog.Sender do
def handle_cast({:event, event}, state) do
case state do
%{num_events: n, events: events} when n + 1 >= state.max_batch_events ->
if state.timer_ref, do: Process.cancel_timer(state.timer_ref, async: true, info: false)

{:noreply, %{state | events: [event | events], num_events: n + 1},
{:continue, :send_batch}}

%{num_events: 0, events: events} ->
Process.send_after(self(), :batch_time_reached, state.max_batch_time_ms)
ref = Process.send_after(self(), :batch_time_reached, state.max_batch_time_ms)

{:noreply, %{state | events: [event | events], num_events: 1}}
{:noreply, %{state | events: [event | events], num_events: 1, timer_ref: ref}}

%{num_events: n, events: events} ->
{:noreply, %{state | events: [event | events], num_events: n + 1}}
end
end

@impl GenServer
def handle_info(:batch_time_reached, state) do
def handle_info(:batch_time_reached, %{num_events: n} = state) when n > 0 do
{:noreply, state, {:continue, :send_batch}}
end

def handle_info(:batch_time_reached, state), do: {:noreply, state}

@impl GenServer
def handle_continue(:send_batch, state) do
# Before we initiate an HTTP request that might block the process
Expand All @@ -101,7 +106,7 @@ defmodule PostHog.Sender do
Registry.update_value(state.registry, registry_key(state.index), fn _ -> :busy end)
PostHog.API.batch(state.api_client, state.events)
Registry.update_value(state.registry, registry_key(state.index), fn _ -> :available end)
{:noreply, %{state | events: [], num_events: 0}}
{:noreply, %{state | events: [], num_events: 0, timer_ref: nil}}
end

@impl GenServer
Expand Down
52 changes: 50 additions & 2 deletions test/posthog/sender_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ defmodule PostHog.SenderTest do
assert %{events: ["my_event"]} = :sys.get_state(pid)
end

test "immediately sends after reaching max_batch_events", %{
test "immediately sends after reaching max_batch_events and cancels timer", %{
api_client: api_client,
registry: registry
} do
Expand Down Expand Up @@ -129,14 +129,18 @@ defmodule PostHog.SenderTest do
end)

Sender.send("foo", @supervisor_name)

%{timer_ref: ref} = :sys.get_state(pid)
assert is_reference(ref)

Sender.send("bar", @supervisor_name)

assert_receive :ready

[{^pid, :busy}] = Registry.lookup(registry, {PostHog.Sender, 1})
send(pid, :go)

assert %{events: []} = :sys.get_state(pid)
assert %{events: [], timer_ref: nil} = :sys.get_state(pid)
[{^pid, :available}] = Registry.lookup(registry, {PostHog.Sender, 1})
end

Expand Down Expand Up @@ -207,5 +211,49 @@ defmodule PostHog.SenderTest do

assert :ok = GenServer.stop(pid)
end

test "does not send empty batch", %{api_client: api_client, registry: registry} do
test_pid = self()

pid =
start_link_supervised!(
{Sender,
supervisor_name: @supervisor_name,
index: 1,
api_client: api_client,
max_batch_time_ms: 60_000,
max_batch_events: 2}
)

expect(API.Mock, :request, fn _client, method, url, opts ->
assert method == :post
assert url == "/batch"

assert opts[:json] == %{
batch: ["bar", "foo"]
}

send(test_pid, :ready)

receive do
:go -> :ok
end

send(test_pid, :done)
end)

Sender.send("foo", @supervisor_name)
Sender.send("bar", @supervisor_name)

assert_receive :ready
[{^pid, :busy}] = Registry.lookup(registry, {PostHog.Sender, 1})
send(pid, :go)
assert_receive :done
:sys.get_status(pid)
[{^pid, :available}] = Registry.lookup(registry, {PostHog.Sender, 1})

send(pid, :batch_time_reached)
refute_receive :ready
end
end
end