Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .config/dotnet-tools.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"rollForward": false
},
"easybuild.shipit": {
"version": "1.1.2",
"version": "2.0.0",
"commands": [
"shipit"
],
Expand Down
58 changes: 32 additions & 26 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,41 @@ on:

permissions:
contents: write
pull-requests: write

jobs:
publish:
shipit-pr:
name: ShipIt - Pull Request
runs-on: ubuntu-latest
if: "!startsWith(github.event.head_commit.message, 'chore: release ')"
timeout-minutes: 10
steps:
- name: Checkout code
uses: actions/checkout@v6
with:
fetch-depth: 0

- name: Setup .NET
uses: actions/setup-dotnet@v5
with:
dotnet-version: 10.x

- name: Install tools
run: dotnet tool restore

- name: ShipIt (Pull Request)
run: dotnet shipit --pre-release rc
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}

release:
name: Release
runs-on: ubuntu-latest
timeout-minutes: 10
if: "startsWith(github.event.head_commit.message, 'chore: release ')"
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6

- name: Setup .NET
uses: actions/setup-dotnet@v5
Expand All @@ -24,32 +50,12 @@ jobs:
- name: Install just
uses: extractions/setup-just@v3

- name: Extract version
id: version
run: |
MESSAGE="${{ github.event.head_commit.message }}"
VERSION="${MESSAGE#chore: release }"
VERSION="${VERSION%% *}"
# Strip package name prefix (e.g. Fable.Reactive@1.0.0-rc.4 -> 1.0.0-rc.4)
VERSION="${VERSION##*@}"
echo "version=$VERSION" >> "$GITHUB_OUTPUT"
echo "tag=v$VERSION" >> "$GITHUB_OUTPUT"

- name: Restore dependencies
- name: Install tools
run: |
dotnet tool restore
dotnet restore

- name: Pack NuGet
run: just pack-version ${{ steps.version.outputs.version }}

- name: Push NuGet
run: dotnet nuget push 'src/**/Release/*.nupkg' -s https://api.nuget.org/v3/index.json -k ${{ secrets.NUGET_API_KEY }}

- name: Push AsyncSeq NuGet
run: dotnet nuget push 'extra/**/Release/*.nupkg' -s https://api.nuget.org/v3/index.json -k ${{ secrets.NUGET_API_KEY }}

- name: Create GitHub Release
run: gh release create ${{ steps.version.outputs.tag }} --title "${{ steps.version.outputs.version }}" --generate-notes || echo "Release already exists"
- name: Build, pack and push
run: just release
env:
GH_TOKEN: ${{ github.token }}
NUGET_KEY: ${{ secrets.NUGET_API_KEY }}
19 changes: 11 additions & 8 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@ restore:

# --- Packaging ---

# Create NuGet package
# Create NuGet packages with versions from changelog
pack:
dotnet build {{src_path}}
dotnet pack {{src_path}} -c Release

# Create NuGet package with specific version (used in CI)
pack-version version:
dotnet pack {{src_path}} -c Release -p:PackageVersion={{version}} -p:InformationalVersion={{version}}
dotnet pack extra/AsyncSeq -c Release -p:PackageVersion={{version}} -p:InformationalVersion={{version}}
#!/usr/bin/env bash
set -euo pipefail
get_version() { grep -m1 '^## ' "$1" | sed 's/^## \([^ ]*\).*/\1/'; }
VERSION=$(get_version CHANGELOG.md)
dotnet pack {{src_path}} -c Release -o ./nupkgs -p:PackageVersion=$VERSION -p:InformationalVersion=$VERSION
dotnet pack extra/AsyncSeq -c Release -o ./nupkgs -p:PackageVersion=$VERSION -p:InformationalVersion=$VERSION

# Pack and push all packages to NuGet (used in CI)
release: pack
dotnet nuget push './nupkgs/*.nupkg' -s https://api.nuget.org/v3/index.json -k $NUGET_KEY

# Run EasyBuild.ShipIt for release management
shipit *args:
Expand Down
2 changes: 1 addition & 1 deletion paket.dependencies
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ group Reactive
framework: netstandard2.0

nuget FSharp.Core
nuget Fable.Actor
nuget Fable.Actor 5.0.0-rc.8

group AsyncSeq
source https://api.nuget.org/v3/index.json
Expand Down
26 changes: 15 additions & 11 deletions paket.lock
Original file line number Diff line number Diff line change
@@ -1,16 +1,5 @@


GROUP Reactive
RESTRICTION: == netstandard2.0
NUGET
remote: https://api.nuget.org/v3/index.json
Fable.Actor (5.0.0-rc.2)
Fable.Core (>= 5.0.0-rc.1)
FSharp.Core (>= 10.1.201)
Fable.Core (5.0.0-rc.1)
FSharp.Core (>= 4.7.2)
FSharp.Core (11.0.100)

GROUP AsyncSeq
RESTRICTION: == netstandard2.1
NUGET
Expand All @@ -21,6 +10,21 @@ NUGET
FSharp.Core (11.0.100)
Microsoft.Bcl.AsyncInterfaces (10.0.2)

GROUP Reactive
RESTRICTION: == netstandard2.0
NUGET
remote: https://api.nuget.org/v3/index.json
Fable.Actor (5.0.0-rc.8)
Fable.Beam (>= 5.0.0-rc.22)
Fable.Core (>= 5.0.0-rc.1)
FSharp.Core (>= 10.1.201)
Fable.Beam (5.0.0-rc.22)
Fable.Core (5.0.0-rc.1)
FSharp.Core (>= 5.0)
Fable.Core (5.0.0-rc.1)
FSharp.Core (>= 4.7.2)
FSharp.Core (10.1.201)

GROUP Test
RESTRICTION: == net10.0
NUGET
Expand Down
156 changes: 156 additions & 0 deletions src/ActorInterop.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
namespace Fable.Reactive

open Fable.Actor
open Fable.Actor.Types

open Fable.Reactive.Core

[<RequireQualifiedAccess>]
module internal ActorInterop =

/// Wraps an Actor<Notification<'T>> as an IAsyncObserver<'T>.
/// Posts each notification to the actor. The actor owns its own lifecycle.
let toObserver (actor: Actor<Notification<'T>>) : IAsyncObserver<'T> =
{ new IAsyncObserver<'T> with
member _.OnNextAsync x = async { actor.Post(OnNext x) }
member _.OnErrorAsync err = async { actor.Post(OnError err) }
member _.OnCompletedAsync() = async { actor.Post OnCompleted } }

/// Subscribe an observable to an actor that receives Notification<'T>.
/// Actor lifecycle is caller-managed.
let subscribeActor (actor: Actor<Notification<'T>>) (source: IAsyncObservable<'T>) : Async<IReactiveDisposable> =
source.SubscribeAsync(toObserver actor)

/// For each upstream item, posts it to a supervised per-subscription actor.
/// The actor emits downstream via an emit callback provided at spawn time.
/// Terminal events bypass the actor and go directly to downstream.
/// The decider controls crash behavior:
/// Escalate → forward as OnErrorAsync (terminates the stream)
/// Stop → actor dies, stream continues (crashed item is lost)
/// Restart → actor is restarted, stream continues (crashed item is lost)
let flatMapActorSupervised
(decide: exn -> Directive)
(handler: ('TResult -> unit) -> Actor<'TSource> -> ActorOp<unit>)
(source: IAsyncObservable<'TSource>)
: IAsyncObservable<'TResult> =
let subscribeAsync (aobv: IAsyncObserver<'TResult>) =
async {
let dispatch, stream = Subjects.subject<'TResult> ()
let! innerDisp = stream.SubscribeAsync aobv

let emit value =
dispatch.OnNextAsync value |> Async.Start'

// Shared mutable ref — upstream posts directly to the child actor.
// Monitor swaps the ref on Restart.
let childRef = ref Unchecked.defaultof<Actor<'TSource>>
let ready = System.Threading.Tasks.TaskCompletionSource<unit>()

// Monitor actor: receives ChildExited and applies the supervision directive.
let monitor =
Actor.spawn (fun inbox ->
childRef.Value <- Actor.spawnLinked inbox (handler emit)
ready.SetResult()

let rec loop () =
async {
let! msg = inbox.Receive()

match Actor.tryAsChildExited msg with
| Some exited ->
let ex =
match exited.Reason with
| :? exn as e -> e
| r -> ProcessExitException(sprintf "%A" r)

match decide ex with
| Directive.Escalate -> do! aobv.OnErrorAsync ex
| Directive.Stop -> ()
| Directive.Restart -> childRef.Value <- Actor.spawnLinked inbox (handler emit)
| None -> ()

return! loop ()
}

loop ())

do! Async.AwaitTask ready.Task

let obv =
{ new IAsyncObserver<'TSource> with
member _.OnNextAsync x = async { childRef.Value.Post x }
member _.OnErrorAsync err = aobv.OnErrorAsync err
member _.OnCompletedAsync() = aobv.OnCompletedAsync() }

let! sourceDisp = source.SubscribeAsync obv

return
AsyncDisposable.Composite
[ sourceDisp
innerDisp
AsyncDisposable.Create(fun () -> async { Actor.kill monitor }) ]
}

{ new IAsyncObservable<'TResult> with
member _.SubscribeAsync o = subscribeAsync o }

/// For each upstream item, posts it to a per-subscription actor.
/// The actor emits downstream via an emit callback provided at spawn time.
/// Terminal events bypass the actor and go directly to downstream.
/// If the actor crashes, the error is forwarded downstream as OnErrorAsync.
let flatMapActor
(handler: ('TResult -> unit) -> Actor<'TSource> -> ActorOp<unit>)
(source: IAsyncObservable<'TSource>)
: IAsyncObservable<'TResult> =
flatMapActorSupervised (fun _ -> Directive.Escalate) handler source

/// Stateful 1-to-1 transform using an actor with request-reply (call).
/// Provides backpressure — the pipeline waits for the actor's reply before emitting downstream.
let mapActor
(handler: 'State -> 'TSource -> 'State * 'TResult)
(initialState: 'State)
(source: IAsyncObservable<'TSource>)
: IAsyncObservable<'TResult> =
let subscribeAsync (aobv: IAsyncObserver<'TResult>) =
async {
let actor =
Actor.spawn (fun inbox ->
let rec loop state =
async {
let! (value, rc: ReplyChannel<'TResult>) = inbox.Receive()
let state', result = handler state value
rc.Reply result
return! loop state'
}

loop initialState)

let obv =
{ new IAsyncObserver<'TSource> with
member _.OnNextAsync x =
async {
let! result = Actor.call actor x
do! aobv.OnNextAsync result
}

member _.OnErrorAsync err = aobv.OnErrorAsync err
member _.OnCompletedAsync() = aobv.OnCompletedAsync() }

let! disp = source.SubscribeAsync obv

return AsyncDisposable.Composite [ disp; AsyncDisposable.Create(fun () -> async { Actor.kill actor }) ]
}

{ new IAsyncObservable<'TResult> with
member _.SubscribeAsync o = subscribeAsync o }

/// Create an actor-backed subject. The actor body receives an emit callback.
/// Returns the actor (for posting messages) and an observable (for subscribing).
let ofActor (body: ('T -> unit) -> Actor<'Msg> -> ActorOp<unit>) : Actor<'Msg> * IAsyncObservable<'T> =
let dispatch, stream = Subjects.subject<'T> ()

let emit value =
dispatch.OnNextAsync value |> Async.Start'

let actor = Actor.spawn (body emit)
actor, stream
2 changes: 1 addition & 1 deletion src/Aggregate.fs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ module internal Aggregation =
: IAsyncObservable<IAsyncObservable<'TSource>> =
let subscribeAsync (aobv: IAsyncObserver<IAsyncObservable<'TSource>>) =
let agent =
spawn (fun inbox ->
Actor.spawn (fun inbox ->
let rec messageLoop ((groups, disposed): Map<'TKey, IAsyncObserver<'TSource>> * bool) =
async {
let! n = inbox.Receive()
Expand Down
37 changes: 37 additions & 0 deletions src/AsyncObservable.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ namespace Fable.Reactive
open System
open System.Threading
open Fable.Actor
open Fable.Actor.Types

open Fable.Reactive.Core

Expand Down Expand Up @@ -339,3 +340,39 @@ module Reactive =
/// Tap synchronously into the stream performing side effects by the given `onNext` action.
let tapOnNext (onNext: 'a -> unit) (source: IAsyncObservable<'a>) : IAsyncObservable<'a> =
Tap.tapOnNext onNext source

// ActorInterop Region

/// Subscribe an observable to an actor that receives Notification<'T>.
let subscribeActor (actor: Actor<Notification<'a>>) (source: IAsyncObservable<'a>) : Async<IReactiveDisposable> =
ActorInterop.subscribeActor actor source

/// For each upstream item, posts it to a supervised per-subscription actor.
/// The decider controls crash behavior: Escalate (OnError), Stop (drop), Restart (retry).
let flatMapActorSupervised
(decide: exn -> Directive)
(handler: ('b -> unit) -> Actor<'a> -> ActorOp<unit>)
(source: IAsyncObservable<'a>)
: IAsyncObservable<'b> =
ActorInterop.flatMapActorSupervised decide handler source

/// For each upstream item, posts it to a per-subscription actor.
/// The actor emits downstream via an emit callback. Actor crash → OnError.
let flatMapActor
(handler: ('b -> unit) -> Actor<'a> -> ActorOp<unit>)
(source: IAsyncObservable<'a>)
: IAsyncObservable<'b> =
ActorInterop.flatMapActor handler source

/// Stateful 1-to-1 transform using an actor with request-reply.
/// Provides backpressure — the pipeline waits for the actor's reply before emitting downstream.
let mapActor
(handler: 's -> 'a -> 's * 'b)
(initialState: 's)
(source: IAsyncObservable<'a>)
: IAsyncObservable<'b> =
ActorInterop.mapActor handler initialState source

/// Create an actor-backed subject. Returns the actor and an observable.
let ofActor (body: ('a -> unit) -> Actor<'msg> -> ActorOp<unit>) : Actor<'msg> * IAsyncObservable<'a> =
ActorInterop.ofActor body
4 changes: 2 additions & 2 deletions src/AsyncObserver.fs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ module AsyncObserver =
/// (OnNext* (OnError|OnCompleted)?) is not violated.
let safeObserver (obv: IAsyncObserver<'TSource>) (disposable: IReactiveDisposable) : IAsyncObserver<'TSource> =
let agent =
spawn (fun inbox ->
Actor.spawn (fun inbox ->
let rec messageLoop stopped =
async {
let! n = inbox.Receive()
Expand Down Expand Up @@ -92,7 +92,7 @@ module AsyncObserver =
(obv: IAsyncObserver<'TSource>)
: IAsyncObserver<'TSource> * (Async<IReactiveDisposable> -> Async<IReactiveDisposable>) =
let agent =
spawn (fun inbox ->
Actor.spawn (fun inbox ->
let rec messageLoop disposables =
async {
let! cmd = inbox.Receive()
Expand Down
Loading
Loading