-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathEventStoreMacros.fs
More file actions
124 lines (108 loc) · 6.04 KB
/
EventStoreMacros.fs
File metadata and controls
124 lines (108 loc) · 6.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
namespace Macros
open Microsoft.FSharp.Core
module EventStore =
module AgentHelper =
type Agent<'T> = MailboxProcessor<'T>
let post (agent:Agent<'T>) message = agent.Post message
let postAsyncReply (agent:Agent<'T>) messageConstr = agent.PostAndAsyncReply(messageConstr)
open AgentHelper
type StreamId = StreamId of int
type StreamVersion = StreamVersion of int
type SaveResult = | Saved | VersionConflict
[<NoComparison;NoEquality>]
type Messages<'T> =
| GetVersion of StreamId * AsyncReplyChannel<StreamVersion>
| GetEvents of StreamId * AsyncReplyChannel<'T list option>
| SaveEvents of StreamId * StreamVersion * 'T list * AsyncReplyChannel<SaveResult>
| AddSubscriber of string * (StreamId * 'T list -> unit)
| RemoveSubscriber of string
[<NoComparison;NoEquality>]
type internal EventStoreState<'TEvent,'THandler> =
{
EventHandler: 'THandler
GetVersion: 'THandler -> StreamId -> (StreamVersion *'THandler)
GetEvents: 'THandler -> StreamId -> ('TEvent list option * 'THandler)
SaveEvents: 'THandler -> StreamId -> StreamVersion -> 'TEvent list -> (SaveResult * 'THandler)
Subscribers: Map<string, (StreamId * 'TEvent list -> unit)>
}
let eventStoreAgent<'T, 'TEventHandler> (eventHandler:'TEventHandler) getVersion getEvents saveEvents (inbox:Agent<Messages<'T>>) =
let initState =
{
EventHandler = eventHandler
Subscribers = Map.empty
GetEvents = getEvents
GetVersion = getVersion
SaveEvents = saveEvents
}
let rec loop state =
async {
let! msg = inbox.Receive()
match msg with
| GetVersion (id,replyChannel) ->
let (version,newHandler) = state.GetVersion state.EventHandler id
replyChannel.Reply(version)
return! loop {state with EventHandler = newHandler}
| GetEvents (id, replyChannel) ->
let (events, newHandler) = state.GetEvents state.EventHandler id
replyChannel.Reply(events)
return! loop {state with EventHandler = newHandler}
| SaveEvents (id, expectedVersion, events, replyChannel) ->
let (result, newHandler) = state.SaveEvents state.EventHandler id expectedVersion events
if result = Saved then state.Subscribers |> Map.iter (fun _ sub -> sub(id, events)) else ()
replyChannel.Reply(result)
return! loop {state with EventHandler = newHandler}
| AddSubscriber (subId, subFunction) ->
let newState = {state with Subscribers = (state.Subscribers |> Map.add subId subFunction)}
return! loop newState
| RemoveSubscriber subId ->
let newState = {state with Subscribers = (state.Subscribers |> Map.remove subId )}
return! loop newState
}
loop initState
let createEventStoreAgent<'TEvent, 'TEventHandler> eventHandler getVersion getEvents saveEvents =
Agent.Start(eventStoreAgent<'TEvent, 'TEventHandler> eventHandler getVersion getEvents saveEvents)
[<NoComparison;NoEquality>]
type EventStore<'TEvent, 'TError> =
{
GetEvents: StreamId -> Choice<StreamVersion*'TEvent list, 'TError>
GetVersion: StreamId -> StreamVersion
SaveEvents: StreamId -> StreamVersion -> 'TEvent list -> Choice<'TEvent list, 'TError>
AddSubscriber: string -> (StreamId * 'TEvent list -> unit) -> unit
RemoveSubscriber: string -> unit
}
let createEventStore<'TEvent, 'TError> (versionError:'TError) agent =
let getEvents streamId : Choice<StreamVersion*'TEvent list, 'TError> =
let result = (fun r -> GetEvents (streamId, r)) |> postAsyncReply agent |> Async.RunSynchronously
match result with
| Some events -> (StreamVersion (events |> List.length), events) |> Choice1Of2
| None -> (StreamVersion 0, []) |> Choice1Of2
let saveEvents streamId expectedVersion events : Choice<'TEvent list, 'TError> =
let result = (fun r -> SaveEvents(streamId, expectedVersion, events, r)) |> postAsyncReply agent |> Async.RunSynchronously
match result with
| Saved -> events |> Choice1Of2
| VersionConflict -> versionError |> Choice2Of2
let addSubscriber subId subscriber =
(subId,subscriber) |> AddSubscriber |> post agent
let removeSubscriber subId =
subId |> RemoveSubscriber |> post agent
let getVersion streamId : StreamVersion =
let result = (fun r -> GetVersion(streamId,r)) |> postAsyncReply agent |> Async.RunSynchronously
result
{ GetEvents = getEvents; SaveEvents = saveEvents; AddSubscriber = addSubscriber; RemoveSubscriber = removeSubscriber; GetVersion = getVersion}
let createInMemoryEventStore<'TEvent, 'TError> (versionError:'TError) =
let initState : Map<StreamId, 'TEvent list> = Map.empty
let saveEventsInMap map id expectedVersion events =
match map |> Map.tryFind id with
| None ->
(Saved, map |> Map.add id events)
| Some existingEvents ->
let currentVersion = existingEvents |> List.length |> StreamVersion
match currentVersion = expectedVersion with
| true ->
(Saved, map |> Map.add id (existingEvents@events))
| false ->
(VersionConflict, map)
let getEventsInMap map id = Map.tryFind id map, map
let getVersion map id = Map.tryFind id map |> Option.map List.length |> Option.getOrDefault 0 |> StreamVersion, map
let agent = createEventStoreAgent initState getVersion getEventsInMap saveEventsInMap
createEventStore<'TEvent, 'TError> versionError agent