Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/hex/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ defmodule Hex.Application do
[
Hex.Netrc.Cache,
Hex.OAuth,
Hex.Repo,
Hex.State,
Hex.Server,
{Hex.Parallel, [:hex_fetcher]}
Expand All @@ -60,6 +61,7 @@ defmodule Hex.Application do
[
Hex.Netrc.Cache,
Hex.OAuth,
Hex.Repo,
Hex.State,
Hex.Server,
{Hex.Parallel, [:hex_fetcher]},
Expand Down
149 changes: 129 additions & 20 deletions lib/hex/once_cache.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
defmodule Hex.OnceCache do
@moduledoc """
A cache that computes a value once on first access and caches it.
A cache that computes values at most once and caches them.

Uses Agent.get_and_update/2 to ensure only one process computes the value,
even when multiple processes access it concurrently. All other processes
will wait for the Agent call to complete and receive the computed value.
Supports both single-value caching via `fetch/3` and keyed caching via
`fetch_key/4`. Computations run in the caller's process, allowing concurrent
computations for different keys. Multiple callers requesting the same key
will wait for the first caller's computation to complete.

## Example

Expand All @@ -26,18 +27,18 @@ defmodule Hex.OnceCache do
# => :expensive_result (no "Computing..." output)
"""

use Agent
use GenServer

@doc """
Starts a new OnceCache Agent.
Starts a new OnceCache.

## Options

* `:name` - The name to register the cache under (required)
"""
def start_link(opts) do
name = Keyword.fetch!(opts, :name)
Agent.start_link(fn -> :not_cached end, name: name)
GenServer.start_link(__MODULE__, :ok, name: name)
end

@doc """
Expand All @@ -52,33 +53,141 @@ defmodule Hex.OnceCache do
Use `:infinity` for operations that may take a long time (e.g., user interaction).
"""
def fetch(name, compute_fun, opts \\ []) do
fetch_key(name, :__single__, compute_fun, opts)
end

@doc """
Fetches a keyed cached value or computes it if not yet cached.

Like `fetch/3`, but supports multiple independent cached values identified by key.
The compute function is only called once per key, even with concurrent access.
Computations for different keys run concurrently in their respective caller processes.

Should not be mixed with `fetch/3` or `put/2` on the same cache.
"""
def fetch_key(name, key, compute_fun, opts \\ []) do
timeout = Keyword.get(opts, :timeout, 5000)

Agent.get_and_update(
name,
fn
:not_cached ->
value = compute_fun.()
{value, {:cached, value}}
case GenServer.call(name, {:fetch, key}, timeout) do
{:ok, value} ->
value

{:cached, cached} ->
{cached, {:cached, cached}}
end,
timeout
)
:compute ->
try do
value = compute_fun.()
:ok = GenServer.call(name, {:computed, key, value}, timeout)
value
catch
kind, reason ->
GenServer.cast(name, {:failed, key})
:erlang.raise(kind, reason, __STACKTRACE__)
end
end
end

@doc """
Stores a value in the cache without computing it.
"""
def put(name, value) do
Agent.update(name, fn _ -> {:cached, value} end)
GenServer.call(name, {:put, :__single__, value})
end

@doc """
Clears the cache.
"""
def clear(name) do
Agent.update(name, fn _ -> :not_cached end)
GenServer.call(name, :clear)
end

# GenServer callbacks

@impl true
def init(:ok) do
{:ok, %{}}
end

@impl true
def handle_call({:fetch, key}, {pid, _} = from, state) do
case Map.get(state, key) do
{:cached, value} ->
{:reply, {:ok, value}, state}

{:computing, _mon_ref, _waiters} ->
{:noreply, update_waiters(state, key, from)}

nil ->
mon_ref = Process.monitor(pid)
{:reply, :compute, Map.put(state, key, {:computing, mon_ref, []})}
end
end

def handle_call({:computed, key, value}, _from, state) do
case Map.get(state, key) do
{:computing, mon_ref, waiters} ->
Process.demonitor(mon_ref, [:flush])

for waiter <- waiters do
GenServer.reply(waiter, {:ok, value})
end

{:reply, :ok, Map.put(state, key, {:cached, value})}

_ ->
{:reply, :ok, Map.put(state, key, {:cached, value})}
end
end

def handle_call({:put, key, value}, _from, state) do
{:reply, :ok, Map.put(state, key, {:cached, value})}
end

def handle_call(:clear, _from, _state) do
{:reply, :ok, %{}}
end

@impl true
def handle_cast({:failed, key}, state) do
case Map.get(state, key) do
{:computing, mon_ref, waiters} ->
Process.demonitor(mon_ref, [:flush])
{:noreply, hand_off_or_remove(state, key, waiters)}

_ ->
{:noreply, state}
end
end

@impl true
def handle_info({:DOWN, mon_ref, :process, _pid, _reason}, state) do
case find_computing_key(state, mon_ref) do
{key, waiters} ->
{:noreply, hand_off_or_remove(state, key, waiters)}

nil ->
{:noreply, state}
end
end

defp update_waiters(state, key, from) do
Map.update!(state, key, fn {:computing, mon_ref, waiters} ->
{:computing, mon_ref, [from | waiters]}
end)
end

defp hand_off_or_remove(state, key, [{pid, _} = next | rest]) do
new_mon_ref = Process.monitor(pid)
GenServer.reply(next, :compute)
Map.put(state, key, {:computing, new_mon_ref, rest})
end

defp hand_off_or_remove(state, key, []) do
Map.delete(state, key)
end

defp find_computing_key(state, mon_ref) do
Enum.find_value(state, fn
{key, {:computing, ^mon_ref, waiters}} -> {key, waiters}
_ -> nil
end)
end
end
27 changes: 22 additions & 5 deletions lib/hex/repo.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule Hex.Repo do
@moduledoc false

@exchange_cache __MODULE__.ExchangeCache
@hexpm_url "https://repo.hex.pm"
@hexpm_public_key """
-----BEGIN PUBLIC KEY-----
Expand All @@ -14,6 +15,21 @@ defmodule Hex.Repo do
-----END PUBLIC KEY-----
"""

def start_link(_args) do
Hex.OnceCache.start_link(name: @exchange_cache)
end

def child_spec(arg) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [arg]}
}
end

def clear_exchange_cache do
Hex.OnceCache.clear(@exchange_cache)
end

def fetch_repo(repo) do
repo = repo || "hexpm"
repos = Hex.State.fetch!(:repos)
Expand Down Expand Up @@ -335,11 +351,12 @@ defmodule Hex.Repo do
{:ok, access_token} ->
{:ok, access_token}

:expired ->
do_exchange_api_key(repo_config, repo_name)

:not_found ->
do_exchange_api_key(repo_config, repo_name)
_expired_or_not_found ->
Hex.OnceCache.fetch_key(
@exchange_cache,
{repo_name, repo_config.auth_key},
fn -> do_exchange_api_key(repo_config, repo_name) end
)
end
end

Expand Down
Loading
Loading