Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
cb13f0d
SSES Playground
joshlarson Feb 25, 2026
503745b
Move `PredictionsConsumerStage` to a separate file
joshlarson Apr 21, 2026
b643015
Add UpcomingDeparturesPubSub GenServer
joshlarson Apr 22, 2026
0f83595
Add live view to inspect the UpcomingDeparturesPubSub
joshlarson Apr 22, 2026
b71eb7a
Fix alias order
joshlarson Apr 22, 2026
1e0aa39
Add a subscribeable supervisor (and shut it down when it's not needed!)
joshlarson Apr 23, 2026
e343164
Add a broadcaster (that doesn't broadcast)
joshlarson Apr 25, 2026
2490fd4
Add `PredictionsWorker` and `PredictionsManager`
joshlarson Apr 27, 2026
0c9832f
Add `PredictionsSupervisor` and `PredictionsBroadcasterStage`
joshlarson Apr 27, 2026
b91729e
Refine `PredictionBroadcasterStage`
joshlarson Apr 28, 2026
0ee56f1
Remove unnecessary params from `PredictionsWorker`
joshlarson Apr 28, 2026
9dea473
Cleanup PredictionsStreamLive
joshlarson Apr 28, 2026
0d341af
Refactor PredictionsStreamLive
joshlarson Apr 28, 2026
14f4ff7
Restructure how predictions and events are shown in `PredictionsStrea…
joshlarson Apr 29, 2026
46e8638
fix: Include timezone info in `Prediction.StreamParser` datetimes
joshlarson Apr 29, 2026
09eba16
Show departure time on PredictionsStreamLive
joshlarson Apr 29, 2026
7fb18e1
Send a fake `"reset"` event on subscribe
joshlarson Apr 29, 2026
4f45c96
Show predictions snapshot and derived predictions
joshlarson Apr 30, 2026
b5c2755
Cleanup old modules that we're not using anymore
joshlarson Apr 30, 2026
db908c3
Extract shareable components from PredictionsStreamLive
joshlarson Apr 30, 2026
f1faeaa
Remove defunct `UpcomingDeparturesPubSubStateLive`
joshlarson Apr 30, 2026
9fc7e03
Add new UpcomingDeparturesStreamLive
joshlarson Apr 30, 2026
fc30698
Add UpcomingDeparturesManager and UpcomingDeparturesWorker
joshlarson Apr 30, 2026
5b8fb43
Merge remote-tracking branch 'origin/main' into jdl/sses-playground
joshlarson May 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lib/dotcom/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ defmodule Dotcom.Application do
[
Dotcom.ViaFairmount,
{Dotcom.SystemStatus.CommuterRailCache, []},
{Dotcom.SystemStatus.SubwayCache, []}
{Dotcom.SystemStatus.SubwayCache, []},
Dotcom.Playground.PredictionsManager,
Dotcom.Playground.UpcomingDeparturesManager
]
else
[]
Expand Down
98 changes: 98 additions & 0 deletions lib/dotcom/playground/predictions_broadcaster_stage.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
defmodule Dotcom.Playground.PredictionsBroadcasterStage do
use GenStage

alias Predictions.StreamParser
alias ServerSentEventStage.Event

def start_link(opts) do
GenStage.start_link(__MODULE__, opts, name: opts |> Keyword.get(:name))
end

def terminate(_reason, _state), do: :ok

def init(opts) do
subscribe_to = opts |> Keyword.get(:subscribe_to)
publish_to = opts |> Keyword.get(:publish_to)

{:consumer, %{publish_to: publish_to, predictions: %{}}, subscribe_to: [subscribe_to]}
end

def handle_events(
events,
_from,
%{publish_to: publish_to, predictions: predictions} = state
) do
%{parsed_events: parsed_events, predictions: new_predictions} =
events |> Enum.reduce(%{predictions: predictions, parsed_events: []}, &handle_event/2)

send(
publish_to,
{:predictions_update,
%{
predictions: new_predictions |> Map.values(),
events: parsed_events |> Enum.reverse()
}}
)

{:noreply, [], %{state | predictions: new_predictions}}
end

defp handle_event(%Event{event: "reset", data: data}, %{
predictions: _predictions,
parsed_events: parsed_events
}) do
new_predictions =
data
|> JsonApi.parse()
|> then(fn %JsonApi{data: data} -> data end)
|> Map.new(fn %JsonApi.Item{id: id} = item -> {id, StreamParser.parse(item)} end)

%{
parsed_events: [{"reset", Map.values(new_predictions)} | parsed_events],
predictions: new_predictions
}
end

defp handle_event(%Event{event: event_type, data: data}, %{
predictions: predictions,
parsed_events: parsed_events
})
when event_type in ["add", "update"] do
{id, prediction} =
data
|> JsonApi.parse()
|> then(fn %JsonApi{data: data} -> data end)
|> then(fn [%JsonApi.Item{id: id} = item] -> {id, StreamParser.parse(item)} end)

%{
parsed_events: [{event_type, prediction} | parsed_events],
predictions: Map.put(predictions, id, prediction)
}
end

defp handle_event(%Event{event: "remove", data: data}, %{
predictions: predictions,
parsed_events: parsed_events
}) do
id =
data
|> JsonApi.parse()
|> then(fn %JsonApi{data: data} -> data end)
|> then(fn [%JsonApi.Item{id: id}] -> id end)

%{
parsed_events: [{"remove", Map.get(predictions, id)} | parsed_events],
predictions: Map.delete(predictions, id)
}
end

defp handle_event(%Event{event: event_type, data: data}, %{
predictions: predictions,
parsed_events: parsed_events
}) do
%{
parsed_events: [{event_type, data |> JsonApi.parse()} | parsed_events],
predictions: predictions
}
end
end
38 changes: 38 additions & 0 deletions lib/dotcom/playground/predictions_manager.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
defmodule Dotcom.Playground.PredictionsManager do
use GenServer

alias Dotcom.Playground.PredictionsWorker

# Client
def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end

def subscribe(params) do
GenServer.cast(__MODULE__, {:subscribe, self(), params})
end

def unsubscribe() do
GenServer.cast(__MODULE__, {:unsubscribe, self()})
end

# Server
def init(opts) do
{:ok, opts}
end

def handle_cast({:subscribe, pid, params}, state) do
PredictionsWorker.subscribe(pid, params)

{:noreply, state |> Map.put(pid, params)}
end

def handle_cast({:unsubscribe, pid}, state) do
case state do
%{^pid => params} -> PredictionsWorker.unsubscribe(pid, params)
_ -> nil
end

{:noreply, state |> Map.delete(pid)}
end
end
45 changes: 45 additions & 0 deletions lib/dotcom/playground/predictions_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
defmodule Dotcom.Playground.PredictionsSupervisor do
use Supervisor

alias Dotcom.Playground.PredictionsBroadcasterStage

# Client
def start_link(%{params: params} = args) do
Supervisor.start_link(__MODULE__, args, name: process_name(params))
end

def stop(%{params: params}) do
Supervisor.stop(process_name(params))
end

# Server
@impl Supervisor
def init(%{params: params, publish_to: publish_to}) do
query = URI.encode_query(params)

url = "#{base_url()}/predictions?#{query}"

Supervisor.init(
[
{ServerSentEventStage, url: url, headers: headers(), name: process_name({:sses, params})},
{PredictionsBroadcasterStage,
publish_to: publish_to,
subscribe_to: process_name({:sses, params}),
name: process_name({:broadcast, params})}
],
strategy: :one_for_all
)
end

defp process_name(args) do
{:global, {:predictions_supervisor, args}}
end

defp base_url() do
Application.get_env(:dotcom, :mbta_api)[:base_url]
end

defp headers() do
Application.get_env(:dotcom, :mbta_api)[:headers]
end
end
85 changes: 85 additions & 0 deletions lib/dotcom/playground/predictions_worker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
defmodule Dotcom.Playground.PredictionsWorker do
alias Dotcom.Playground.PredictionsSupervisor
use GenServer

# Client
def subscribe(caller_pid, params) do
GenServer.start_link(__MODULE__, params, name: process_name(params))
|> case do
{:ok, pid} ->
pid

{:error, {:already_started, pid}} ->
pid
end
|> GenServer.cast({:subscribe, caller_pid})
end

def unsubscribe(caller_pid, params) do
params
|> process_name()
|> GenServer.whereis()
|> GenServer.cast({:unsubscribe, caller_pid})
end

# Server
def init(params) do
PredictionsSupervisor.start_link(%{params: params, publish_to: self()})

{:ok, %{params: params, predictions: :loading, subscribers: MapSet.new()}}
end

def terminate(_reason, %{params: params}) do
PredictionsSupervisor.stop(%{params: params})
end

def handle_cast(
{:subscribe, pid},
%{predictions: predictions, subscribers: subscribers} = state
) do
new_subscribers =
subscribers
|> MapSet.put(pid)

publish_predictions_if_any(pid, predictions)

{:noreply, %{state | subscribers: new_subscribers}}
end

def handle_cast({:unsubscribe, pid}, %{subscribers: subscribers} = state) do
new_subscribers =
subscribers
|> MapSet.delete(pid)

new_state = %{state | subscribers: new_subscribers}

if Enum.empty?(new_subscribers) do
{:stop, :normal, new_state}
else
{:noreply, new_state}
end
end

def handle_info(
{:predictions_update, %{predictions: predictions} = data},
%{subscribers: subscribers} = state
) do
subscribers |> Enum.each(&send(&1, {:predictions_update, data}))

{:noreply, %{state | predictions: {:ok, predictions}}}
end

defp process_name(params) do
{:global, {:predictions, params}}
end

defp publish_predictions_if_any(_pid, :loading) do
end

defp publish_predictions_if_any(pid, {:ok, predictions}) do
send(
pid,
{:predictions_update, %{predictions: predictions, events: [{"reset", predictions}]}}
)
end
end
38 changes: 38 additions & 0 deletions lib/dotcom/playground/upcoming_departures_manager.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
defmodule Dotcom.Playground.UpcomingDeparturesManager do
use GenServer

alias Dotcom.Playground.UpcomingDeparturesWorker

# Client
def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end

def subscribe(params) do
GenServer.cast(__MODULE__, {:subscribe, self(), params})
end

def unsubscribe() do
GenServer.cast(__MODULE__, {:unsubscribe, self()})
end

# Server
def init(opts) do
{:ok, opts}
end

def handle_cast({:subscribe, pid, params}, state) do
UpcomingDeparturesWorker.subscribe(pid, params)

{:noreply, state |> Map.put(pid, params)}
end

def handle_cast({:unsubscribe, pid}, state) do
case state do
%{^pid => params} -> UpcomingDeparturesWorker.unsubscribe(pid, params)
_ -> nil
end

{:noreply, state |> Map.delete(pid)}
end
end
Loading
Loading