Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Example test pattern:

```fsharp
testAsync "Test name" {
let xs = AsyncRx.single 42 |> AsyncRx.map (fun x -> x * 10)
let xs = Reactive.single 42 |> Reactive.map (fun x -> x * 10)
let obv = TestObserver<int>()
let! sub = xs.SubscribeAsync obv
let! latest = obv.Await()
Expand All @@ -54,7 +54,7 @@ testAsync "Test name" {

### Core Types (`src/Types.fs`)

- `IAsyncRxDisposable` - Async disposable (Fable-compatible name)
- `IReactiveDisposable` - Async disposable for subscriptions
- `IAsyncObserver<'T>` - Async observer with OnNextAsync/OnErrorAsync/OnCompletedAsync
- `IAsyncObservable<'T>` - Async observable with SubscribeAsync
- `Notification<'T>` - OnNext/OnError/OnCompleted discriminated union
Expand All @@ -71,8 +71,8 @@ testAsync "Test name" {
| Aggregate.fs | `scan`, `reduce`, `groupBy`, `min`, `max` |
| Timeshift.fs | `delay`, `debounce`, `sample` |
| Subject.fs | Hot/cold stream subjects for multicast |
| AsyncObservable.fs | Main API module exporting all operators via `AsyncRx` |
| Builder.fs | Query/computation expression builder (`asyncRx { }`) |
| AsyncObservable.fs | Main API module exporting all operators via `Reactive` |
| Builder.fs | Query/computation expression builder (`reactive { }`) |

### Key Patterns

Expand Down
2 changes: 1 addition & 1 deletion Fable.Reactive.sln
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26124.0
MinimumVisualStudioVersion = 15.0.26124.0
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "AsyncRx", "src\Fable.Reactive.fsproj", "{8D83B5B9-A2E2-45F4-8B8E-B4A65CFF4CC9}"
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Fable.Reactive", "src\Fable.Reactive.fsproj", "{8D83B5B9-A2E2-45F4-8B8E-B4A65CFF4CC9}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Tests", "test\Tests.fsproj", "{B384F8B3-C04F-4F47-8DB0-B1E9B6339B5C}"
EndProject
Expand Down
15 changes: 6 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
# Fable.Reactive

![Build and Test](https://github.com/dbrattli/AsyncRx/workflows/Build%20and%20Test/badge.svg)
[![codecov](https://codecov.io/gh/dbrattli/AsyncRx/branch/main/graph/badge.svg)](https://codecov.io/gh/dbrattli/AsyncRx)
![Build and Test](https://github.com/dbrattli/Fable.Reactive/workflows/Build%20and%20Test/badge.svg)
[![Nuget](https://img.shields.io/nuget/vpre/Fable.Reactive)](https://www.nuget.org/packages/Fable.Reactive/)

> Fable.Reactive is a lightweight Async Reactive (AsyncRx) library for F#.
> Fable.Reactive is a lightweight Async Reactive library for F#.

Fable.Reactive is a library for asynchronous reactive
programming, and is the implementation of Async Observables
Expand Down Expand Up @@ -35,20 +34,18 @@ your open statements:
+ open Fable.Reactive
```

All sub-namespaces have been renamed accordingly:
All sub-namespaces and modules have been renamed accordingly:

| Old | New |
|-----|-----|
| `FSharp.Control` | `Fable.Reactive` |
| `FSharp.Control.Core` | `Fable.Reactive.Core` |
| `FSharp.Control.AsyncRx` | `Fable.Reactive.AsyncRx` |
| `FSharp.Control.Subjects` | `Fable.Reactive.Subjects` |

The API itself remains unchanged.
| `AsyncRx` module | `Reactive` module |
| `IAsyncRxDisposable` | `IReactiveDisposable` |

## Documentation

Please check out the [documentation](https://fablereaction.readthedocs.io/en/latest/asyncrx/index.html)
Documentation is currently being updated.

## Install

Expand Down
4 changes: 2 additions & 2 deletions extra/AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ open FSharp.Control
open Fable.Reactive

[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module AsyncRx =
module Reactive =
/// Convert async sequence into an async observable.
let ofAsyncSeq (xs: AsyncSeq<'TSource>) : IAsyncObservable<'TSource> =
let subscribeAsync (aobv: IAsyncObserver<'TSource>) : Async<IAsyncRxDisposable> =
let subscribeAsync (aobv: IAsyncObserver<'TSource>) : Async<IReactiveDisposable> =
let cancel, token = canceller ()

async {
Expand Down
4 changes: 2 additions & 2 deletions extra/AsyncSeq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ Extension for converting to and from AsyncSeq.

Adds to functions:

- `AsyncRx.ofAsyncSeq`
- `AsyncRx.toAsyncSeq`
- `Reactive.ofAsyncSeq`
- `Reactive.toAsyncSeq`

## Install

Expand Down
2 changes: 1 addition & 1 deletion paket.dependencies
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
group AsyncRx
group Reactive
source https://api.nuget.org/v3/index.json
framework: netstandard2.0

Expand Down
2 changes: 1 addition & 1 deletion paket.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


GROUP AsyncRx
GROUP Reactive
RESTRICTION: == netstandard2.0
NUGET
remote: https://api.nuget.org/v3/index.json
Expand Down
18 changes: 9 additions & 9 deletions src/AsyncDisposable.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ open Fable.Reactive.Core
type AsyncDisposable private (cancel) =
let mutable isDisposed = 0

interface IAsyncRxDisposable with
interface IReactiveDisposable with
member this.DisposeAsync() =
async {
#if FABLE_COMPILER
Expand All @@ -22,21 +22,21 @@ type AsyncDisposable private (cancel) =
#endif
}

static member Create cancel : IAsyncRxDisposable =
AsyncDisposable cancel :> IAsyncRxDisposable
static member Create cancel : IReactiveDisposable =
AsyncDisposable cancel :> IReactiveDisposable

static member Empty: IAsyncRxDisposable =
static member Empty: IReactiveDisposable =
let cancel () = async { return () }
AsyncDisposable cancel :> IAsyncRxDisposable
AsyncDisposable cancel :> IReactiveDisposable

static member Composite(disposables: IAsyncRxDisposable seq) : IAsyncRxDisposable =
static member Composite(disposables: IReactiveDisposable seq) : IReactiveDisposable =
let cancel () =
async {
for d in disposables do
do! d.DisposeAsync()
}

AsyncDisposable cancel :> IAsyncRxDisposable
AsyncDisposable cancel :> IReactiveDisposable

type Disposable(cancel) =
let mutable isDisposed = 0
Expand Down Expand Up @@ -68,15 +68,15 @@ type Disposable(cancel) =

[<AutoOpen>]
module AsyncDisposable =
type IAsyncRxDisposable with
type IReactiveDisposable with

member this.ToDisposable() =
{ new IDisposable with
member __.Dispose() = this.DisposeAsync() |> Async.Start' }

type System.IDisposable with

member this.ToAsyncDisposable() : IAsyncRxDisposable =
member this.ToAsyncDisposable() : IReactiveDisposable =
AsyncDisposable.Create(fun () -> async { this.Dispose() })

let canceller () =
Expand Down
7 changes: 4 additions & 3 deletions src/AsyncObservable.fs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ module AsyncObservable =
member this.Run(obv: IAsyncObserver<'TSource>) = this.RunAsync obv |> Async.Start'

/// Subscribes the async observer function (`Notification{'a} -> Async{unit}`) to the AsyncObservable
member this.SubscribeAsync<'TSource>(obv: Notification<'TSource> -> Async<unit>) : Async<IAsyncRxDisposable> =
member this.SubscribeAsync<'TSource>(obv: Notification<'TSource> -> Async<unit>) : Async<IReactiveDisposable> =
this.SubscribeAsync(AsyncObserver obv)

/// Returns an observable sequence that contains the elements of the given sequences concatenated together.
Expand All @@ -50,7 +50,8 @@ module Observable =

/// A single module that contains all the operators. Nicer and shorter way than writing AsyncObservable. We want to
/// prefix our operators so we don't mix e.g. `map` with other modules.
module AsyncRx =
[<RequireQualifiedAccess>]
module Reactive =

// Aggregate Region

Expand Down Expand Up @@ -135,7 +136,7 @@ module AsyncRx =

/// Creates an async observable (`AsyncObservable{'a}`) from the
/// given subscribe function.
let create (subscribe: IAsyncObserver<'a> -> Async<IAsyncRxDisposable>) : IAsyncObservable<'a> =
let create (subscribe: IAsyncObserver<'a> -> Async<IReactiveDisposable>) : IAsyncObservable<'a> =
Create.create subscribe

// Returns an observable sequence that invokes the specified factory
Expand Down
8 changes: 4 additions & 4 deletions src/AsyncObserver.fs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ module AsyncObserver =

/// Safe observer that wraps the given observer. Makes sure that invocations are serialized and that the Rx grammar
/// (OnNext* (OnError|OnCompleted)?) is not violated.
let safeObserver (obv: IAsyncObserver<'TSource>) (disposable: IAsyncRxDisposable) : IAsyncObserver<'TSource> =
let safeObserver (obv: IAsyncObserver<'TSource>) (disposable: IReactiveDisposable) : IAsyncObserver<'TSource> =
let agent =
spawn (fun inbox ->
let rec messageLoop stopped =
Expand Down Expand Up @@ -85,12 +85,12 @@ module AsyncObserver =
member this.OnCompletedAsync() = async { OnCompleted |> agent.Post } }

type private Msg =
| Disposable of IAsyncRxDisposable
| Disposable of IReactiveDisposable
| Dispose

let autoDetachObserver
(obv: IAsyncObserver<'TSource>)
: IAsyncObserver<'TSource> * (Async<IAsyncRxDisposable> -> Async<IAsyncRxDisposable>) =
: IAsyncObserver<'TSource> * (Async<IReactiveDisposable> -> Async<IReactiveDisposable>) =
let agent =
spawn (fun inbox ->
let rec messageLoop disposables =
Expand All @@ -117,7 +117,7 @@ module AsyncObserver =
let safeObv = AsyncDisposable.Create cancel |> safeObserver obv

// Auto-detaches (disposes) the disposable when the observer completes with success or error.
let autoDetach (disposable: Async<IAsyncRxDisposable>) =
let autoDetach (disposable: Async<IReactiveDisposable>) =
async {
let! disp = disposable
agent.Post(Disposable disp)
Expand Down
4 changes: 2 additions & 2 deletions src/Builder.fs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ type QueryBuilder() =
[<AutoOpen>]
module QueryBuilder =
/// Query builder for an async reactive event source
let asyncRx = QueryBuilder()
let reactive = QueryBuilder()

/// We extend AsyncBuilder to use `use!` for resource managemnt when using async builder.
type AsyncBuilder with

member builder.Using(resource: #IAsyncRxDisposable, f: #IAsyncRxDisposable -> Async<'TSource>) =
member builder.Using(resource: #IReactiveDisposable, f: #IReactiveDisposable -> Async<'TSource>) =
let mutable x = 0

let disposeFunction _ =
Expand Down
2 changes: 1 addition & 1 deletion src/Combine.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ module internal Combine =
type Key = int

type Model<'a> =
{ Subscriptions: Map<Key, IAsyncRxDisposable>
{ Subscriptions: Map<Key, IReactiveDisposable>
Queue: IAsyncObservable<'a> list
IsStopped: bool
Key: Key }
Expand Down
14 changes: 7 additions & 7 deletions src/Create.fs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ module internal Create =

/// Creates an async observable (`AsyncObservable{'TSource}`) from the
/// given subscribe function.
let create (subscribe: IAsyncObserver<'TSource> -> Async<IAsyncRxDisposable>) : IAsyncObservable<'TSource> =
let create (subscribe: IAsyncObserver<'TSource> -> Async<IReactiveDisposable>) : IAsyncObservable<'TSource> =
{ new IAsyncObservable<'TSource> with
member _.SubscribeAsync o = subscribe o }

// Create async observable from async worker function
let ofAsyncWorker
(worker: IAsyncObserver<'TSource> -> CancellationToken -> Async<unit>)
: IAsyncObservable<'TSource> =
let subscribeAsync (aobv: IAsyncObserver<'TSource>) : Async<IAsyncRxDisposable> =
let subscribeAsync (aobv: IAsyncObserver<'TSource>) : Async<IReactiveDisposable> =
let disposable, token = canceller ()
let safeObv = safeObserver aobv disposable

Expand All @@ -49,7 +49,7 @@ module internal Create =

/// Returns an observable sequence containing the single specified element.
let single (value: 'TSource) =
let subscribeAsync (aobv: IAsyncObserver<'TSource>) : Async<IAsyncRxDisposable> =
let subscribeAsync (aobv: IAsyncObserver<'TSource>) : Async<IReactiveDisposable> =
let safeObv = safeObserver aobv AsyncDisposable.Empty

async {
Expand All @@ -63,7 +63,7 @@ module internal Create =

/// Returns an observable sequence with no elements.
let inline empty<'TSource> () : IAsyncObservable<'TSource> =
let subscribeAsync (aobv: IAsyncObserver<_>) : Async<IAsyncRxDisposable> =
let subscribeAsync (aobv: IAsyncObserver<_>) : Async<IReactiveDisposable> =
async {
do! aobv.OnCompletedAsync()
return AsyncDisposable.Empty
Expand All @@ -74,7 +74,7 @@ module internal Create =

/// Returns an empty observable sequence that never completes.
let inline never<'TSource> () : IAsyncObservable<'TSource> =
let subscribeAsync (_: IAsyncObserver<_>) : Async<IAsyncRxDisposable> = async { return AsyncDisposable.Empty }
let subscribeAsync (_: IAsyncObserver<_>) : Async<IReactiveDisposable> = async { return AsyncDisposable.Empty }

{ new IAsyncObservable<'TSource> with
member _.SubscribeAsync o = subscribeAsync o }
Expand All @@ -100,7 +100,7 @@ module internal Create =

// Returns an observable sequence that invokes the specified factory function whenever a new observer subscribes.
let defer (factory: unit -> IAsyncObservable<'TSource>) : IAsyncObservable<'TSource> =
let subscribeAsync (aobv: IAsyncObserver<'TSource>) : Async<IAsyncRxDisposable> =
let subscribeAsync (aobv: IAsyncObserver<'TSource>) : Async<IReactiveDisposable> =
async {
let result =
try
Expand All @@ -117,7 +117,7 @@ module internal Create =
/// Returns an observable sequence that triggers the increasing sequence starting with 0 after the given msecs, and
/// the after each period.
let interval (msecs: int) (period: int) : IAsyncObservable<int> =
let subscribeAsync (aobv: IAsyncObserver<int>) : Async<IAsyncRxDisposable> =
let subscribeAsync (aobv: IAsyncObserver<int>) : Async<IReactiveDisposable> =
let cancel, token = canceller ()

async {
Expand Down
2 changes: 1 addition & 1 deletion src/Fable.Reactive.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<PackageLicenseFile>LICENSE</PackageLicenseFile>
<PackageTags>fsharp;fable;fable-library;fable-dotnet;fable-javascript;fable-python</PackageTags>
<Description>Async Reactive (AsyncRx) library for F# and Fable</Description>
<Description>Async Reactive library for F# and Fable</Description>
</PropertyGroup>
<ItemGroup>
<Compile Include="Types.fs" />
Expand Down
4 changes: 2 additions & 2 deletions src/Subject.fs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ module internal Subjects =

waitForSubscriber [])

let subscribeAsync (aobv: IAsyncObserver<'TSource>) : Async<IAsyncRxDisposable> =
let subscribeAsync (aobv: IAsyncObserver<'TSource>) : Async<IReactiveDisposable> =
let sobv = safeObserver aobv AsyncDisposable.Empty
actor.Post(Subscribe sobv)

Expand Down Expand Up @@ -99,7 +99,7 @@ module internal Subjects =

messageLoop ())

let subscribeAsync (aobv: IAsyncObserver<'TSource>) : Async<IAsyncRxDisposable> =
let subscribeAsync (aobv: IAsyncObserver<'TSource>) : Async<IReactiveDisposable> =
async {
let sobv = safeObserver aobv AsyncDisposable.Empty
obvs.Add sobv
Expand Down
8 changes: 4 additions & 4 deletions src/Transform.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module internal Transform =
(nextAsync: ('TResult -> Async<unit>) -> 'TSource -> Async<unit>)
(source: IAsyncObservable<'TSource>)
: IAsyncObservable<'TResult> =
let subscribeAsync (aobv: IAsyncObserver<'TResult>) : Async<IAsyncRxDisposable> =
let subscribeAsync (aobv: IAsyncObserver<'TResult>) : Async<IReactiveDisposable> =
{ new IAsyncObserver<'TSource> with
member _.OnNextAsync x = nextAsync aobv.OnNextAsync x
member _.OnErrorAsync err = aobv.OnErrorAsync err
Expand Down Expand Up @@ -101,7 +101,7 @@ module internal Transform =

let agent =
spawn (fun inbox ->
let rec messageLoop (current: IAsyncRxDisposable option, isStopped, currentId) =
let rec messageLoop (current: IReactiveDisposable option, isStopped, currentId) =
async {
let! cmd = inbox.Receive()

Expand Down Expand Up @@ -239,7 +239,7 @@ module internal Transform =

let mb =
spawn (fun inbox ->
let rec messageLoop (count: int) (subscription: IAsyncRxDisposable) =
let rec messageLoop (count: int) (subscription: IReactiveDisposable) =
async {
let! cmd = inbox.Receive()

Expand Down Expand Up @@ -282,7 +282,7 @@ module internal Transform =
member _.SubscribeAsync o = subscribeAsync o }

let toObservable (source: IAsyncObservable<'TSource>) : IObservable<'TSource> =
let mutable subscription: IAsyncRxDisposable = AsyncDisposable.Empty
let mutable subscription: IReactiveDisposable = AsyncDisposable.Empty

{ new IObservable<'TSource> with
member _.Subscribe obv =
Expand Down
Loading
Loading