Skip to content

Commit 9252170

Browse files
authored
[All] Feat: Delegates listeners (#58)
1 parent 0cfb748 commit 9252170

File tree

7 files changed

+289
-136
lines changed

7 files changed

+289
-136
lines changed

common/src/main/kotlin/com/lambda/event/EventFlow.kt

Lines changed: 119 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,20 @@ import com.lambda.event.callback.ICancellable
55
import com.lambda.event.listener.Listener
66
import com.lambda.threading.runConcurrent
77
import com.lambda.threading.runSafe
8-
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.CoroutineScope
9+
import kotlinx.coroutines.Dispatchers
10+
import kotlinx.coroutines.Job
11+
import kotlinx.coroutines.SupervisorJob
912
import kotlinx.coroutines.channels.BufferOverflow
10-
import kotlinx.coroutines.flow.*
13+
import kotlinx.coroutines.flow.Flow
14+
import kotlinx.coroutines.flow.MutableSharedFlow
15+
import kotlinx.coroutines.flow.filter
16+
import kotlinx.coroutines.flow.filterIsInstance
17+
import kotlinx.coroutines.flow.filterNot
18+
import kotlinx.coroutines.flow.first
19+
import kotlinx.coroutines.flow.flow
20+
import kotlinx.coroutines.runBlocking
21+
import kotlinx.coroutines.withTimeout
1122

1223

1324
/**
@@ -28,12 +39,35 @@ object EventFlow {
2839
* useful when you have multiple independent [Job]s running in parallel.
2940
*/
3041
val lambdaScope = CoroutineScope(Dispatchers.Default + SupervisorJob())
42+
43+
/**
44+
* [concurrentFlow] is a [MutableSharedFlow] of [Event]s with a buffer capacity to handle event emissions.
45+
*
46+
* Events emitted to this flow are processed by concurrent listeners, allowing for parallel event handling.
47+
*
48+
* The buffer overflow strategy is set to [BufferOverflow.DROP_OLDEST], meaning that when the buffer is full,
49+
* the oldest event will be dropped to accommodate a new event.
50+
*/
3151
val concurrentFlow = MutableSharedFlow<Event>(
3252
extraBufferCapacity = 1000,
3353
onBufferOverflow = BufferOverflow.DROP_OLDEST
3454
)
3555

56+
/**
57+
* [syncListeners] is a [Subscriber] that manages synchronous listeners.
58+
*
59+
* These listeners will be executed immediately when an event is posted, allowing for immediate responses to events.
60+
* The [syncListeners] are stored in a [Subscriber] object, which is a specialized [ConcurrentHashMap] that manages sets of [Listener]s for different [Event] types.
61+
*/
3662
val syncListeners = Subscriber()
63+
64+
/**
65+
* [concurrentListeners] is a [Subscriber] that manages asynchronous listeners.
66+
*
67+
* These listeners will be executed in parallel, each on a dedicated coroutine,
68+
* allowing for concurrent processing of events.
69+
* The [concurrentListeners] are stored in a [Subscriber] object, which is a specialized [ConcurrentHashMap] that manages sets of [Listener]s for different [Event] types.
70+
*/
3771
val concurrentListeners = Subscriber()
3872

3973
init {
@@ -49,28 +83,60 @@ object EventFlow {
4983
}
5084
}
5185

52-
suspend inline fun <reified E : Event> awaitEvent(
86+
/**
87+
* Suspends until an event of type [E] is received that satisfies the given [predicate].
88+
*
89+
* @param E The type of the event to wait for. This should be a subclass of [Event].
90+
* @param predicate A lambda to test if the event satisfies the condition.
91+
* @return The first event that matches the predicate.
92+
*/
93+
suspend inline fun <reified E : Event> blockUntilEvent(
5394
noinline predicate: SafeContext.(E) -> Boolean = { true },
5495
) = concurrentFlow.filterIsInstance<E>().first {
5596
runSafe {
5697
predicate(it)
5798
} ?: false
5899
}
59100

60-
suspend inline fun <reified E : Event> awaitEventUnsafe(
61-
noinline predicate: (E) -> Boolean = { true },
62-
) = concurrentFlow.filterIsInstance<E>().first(predicate)
63-
64-
suspend inline fun <reified E : Event> awaitEvent(
101+
/**
102+
* Suspends until an event of type [E] is received that satisfies the given [predicate],
103+
* or until the specified [timeout] occurs.
104+
*
105+
* @param E The type of the event to wait for. This should be a subclass of [Event].
106+
* @param timeout The maximum time to wait for the event, in milliseconds.
107+
* @param predicate A lambda to test if the event satisfies the condition.
108+
* @return The first event that matches the predicate or throws a timeout exception if not found.
109+
*/
110+
suspend inline fun <reified E : Event> blockUntilEvent(
65111
timeout: Long,
66112
noinline predicate: (E) -> Boolean = { true },
67113
) = runBlocking {
68-
withTimeout(timeout) {
69-
concurrentFlow.filterIsInstance<E>().first(predicate)
70-
}
114+
withTimeout(timeout) {
115+
concurrentFlow.filterIsInstance<E>().first(predicate)
71116
}
117+
}
72118

73-
suspend inline fun <reified E : Event> awaitEvents(
119+
/**
120+
* Suspends until an event of type [E] is received that satisfies the given [predicate].
121+
*
122+
* This method is "unsafe" in the sense that it does not execute the predicate within a [SafeContext].
123+
*
124+
* @param E The type of the event to wait for. This should be a subclass of [Event].
125+
* @param predicate A lambda to test if the event satisfies the condition.
126+
* @return The first event that matches the predicate.
127+
*/
128+
suspend inline fun <reified E : Event> blockUntilUnsafeEvent(
129+
noinline predicate: (E) -> Boolean = { true },
130+
) = concurrentFlow.filterIsInstance<E>().first(predicate)
131+
132+
/**
133+
* Returns a [Flow] of events of type [E] that satisfy the given [predicate].
134+
*
135+
* @param E The type of the event to filter. This should be a subclass of [Event].
136+
* @param predicate A lambda to test if the event satisfies the condition.
137+
* @return A [Flow] emitting events that match the predicate.
138+
*/
139+
suspend inline fun <reified E : Event> collectEvents(
74140
crossinline predicate: (E) -> Boolean = { true },
75141
): Flow<E> = flow {
76142
concurrentFlow
@@ -149,23 +215,58 @@ object EventFlow {
149215
concurrentListeners.remove(T::class)
150216
}
151217

152-
private fun Event.executeListenerSynchronous() {
153-
syncListeners[this::class]?.forEach { listener ->
218+
/**
219+
* Executes the listeners for the current event type synchronously.
220+
*
221+
* This method retrieves the list of synchronous listeners for the event's class
222+
* and invokes their [Listener.execute] method if the listener should be notified.
223+
*
224+
* @receiver The current event for which listeners are to be executed.
225+
* @param T The type of the event being handled.
226+
*/
227+
private fun <T : Event> T.executeListenerSynchronous() {
228+
syncListeners[this::class]?.forEach {
229+
@Suppress("UNCHECKED_CAST")
230+
val listener = it as? Listener<T> ?: return@forEach
154231
if (shouldNotNotify(listener, this)) return@forEach
155232
listener.execute(this)
156233
}
157234
}
158235

159-
private fun Event.executeListenerConcurrently() {
160-
concurrentListeners[this::class]?.forEach { listener ->
236+
/**
237+
* Executes the listeners for the current event type concurrently.
238+
*
239+
* This method retrieves the list of concurrent listeners for the event's class
240+
* and invokes their [Listener.execute] method if the listener should be notified.
241+
* Each listener is executed on the same coroutine scope.
242+
*
243+
* @receiver The current event for which listeners are to be executed.
244+
* @param T The type of the event being handled.
245+
*/
246+
private fun <T : Event> T.executeListenerConcurrently() {
247+
concurrentListeners[this::class]?.forEach {
248+
@Suppress("UNCHECKED_CAST")
249+
val listener = it as? Listener<T> ?: return@forEach
161250
if (shouldNotNotify(listener, this)) return@forEach
162251
listener.execute(this)
163252
}
164253
}
165254

166-
private fun shouldNotNotify(listener: Listener, event: Event) =
255+
/**
256+
* Determines whether a given [listener] should be notified about an [event].
257+
*
258+
* A listener should not be notified if:
259+
* - The listener's owner is a [Muteable] and is currently muted, unless the listener is set to [alwaysListen].
260+
* - The event is cancellable and has been canceled.
261+
*
262+
* @param listener The listener to check.
263+
* @param event The event being processed.
264+
* @param T The type of the event.
265+
* @return `true` if the listener should not be notified, `false` otherwise.
266+
*/
267+
private fun <T : Event> shouldNotNotify(listener: Listener<T>, event: Event) =
167268
listener.owner is Muteable
168269
&& (listener.owner as Muteable).isMuted
169270
&& !listener.alwaysListen
170271
|| event is ICancellable && event.isCanceled()
171-
}
272+
}

common/src/main/kotlin/com/lambda/event/Subscriber.kt

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,38 +14,34 @@ import kotlin.reflect.KClass
1414
*
1515
* @property defaultListenerSet A [ConcurrentSkipListSet] of [Listener]s, sorted in reverse order.
1616
*/
17-
class Subscriber : ConcurrentHashMap<KClass<*>, ConcurrentSkipListSet<Listener>>() {
18-
val defaultListenerSet: ConcurrentSkipListSet<Listener>
19-
get() = ConcurrentSkipListSet(Comparator.reverseOrder())
17+
class Subscriber : ConcurrentHashMap<KClass<out Event>, ConcurrentSkipListSet<Listener<out Event>>>() {
18+
val defaultListenerSet: ConcurrentSkipListSet<Listener<out Event>>
19+
get() = ConcurrentSkipListSet(Listener.comparator.reversed())
2020

2121

2222
/** Allows a [Listener] to start receiving a specific type of [Event] */
23-
inline fun <reified T : Event> subscribe(listener: Listener) =
23+
inline fun <reified T : Event> subscribe(listener: Listener<T>) =
2424
getOrPut(T::class) { defaultListenerSet }.add(listener)
2525

26-
27-
/** Forgets about every [Listener]s association to [eventType] */
28-
fun unsubscribe(eventType: KClass<*>) = remove(eventType)
29-
30-
/** Allows a [Listener] to stop receiving a specific type of [Event] */
31-
fun unsubscribe(listener: Listener) {
32-
values.forEach { listeners ->
33-
listeners.remove(listener)
34-
}
35-
}
36-
3726
/** Allows a [Subscriber] to start receiving all [Event]s of another [Subscriber]. */
3827
infix fun subscribe(subscriber: Subscriber) {
3928
subscriber.forEach { (eventType, listeners) ->
4029
getOrPut(eventType) { defaultListenerSet }.addAll(listeners)
4130
}
4231
}
4332

33+
/** Forgets about every [Listener]'s association to [eventType] */
34+
fun <T : Event> unsubscribe(eventType: KClass<T>) =
35+
remove(eventType)
36+
37+
/** Allows a [Listener] to stop receiving a specific type of [Event] */
38+
inline fun <reified T : Event> unsubscribe(listener: Listener<T>) =
39+
getOrElse(T::class) { defaultListenerSet }.remove(listener)
40+
4441
/** Allows a [Subscriber] to stop receiving all [Event]s of another [Subscriber] */
4542
infix fun unsubscribe(subscriber: Subscriber) {
46-
entries.removeAll { (eventType, listeners) ->
47-
subscriber[eventType]?.let { listeners.removeAll(it) }
48-
listeners.isEmpty()
43+
subscriber.forEach { (eventType, listeners) ->
44+
getOrElse(eventType) { defaultListenerSet }.removeAll(listeners)
4945
}
5046
}
51-
}
47+
}

common/src/main/kotlin/com/lambda/event/listener/Listener.kt

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import com.lambda.module.Module
2727
* @property owner The owner of the [Listener]. This is typically the object that created the [Listener].
2828
* @property alwaysListen If true, the [Listener] will always be triggered, even if the [owner] is [Muteable.isMuted].
2929
*/
30-
abstract class Listener : Comparable<Listener> {
30+
abstract class Listener<T : Event> : Comparable<Listener<T>> {
3131
abstract val priority: Int
3232
abstract val owner: Any
3333
abstract val alwaysListen: Boolean
@@ -37,21 +37,17 @@ abstract class Listener : Comparable<Listener> {
3737
*
3838
* @param event The event that triggered this listener.
3939
*/
40-
abstract fun execute(event: Event)
40+
abstract fun execute(event: T)
4141

42-
/**
43-
* Compares this listener with another listener.
44-
* The comparison is based first on the priority, and then on the hash code of the listeners.
45-
*
46-
* @param other The other listener to compare with.
47-
* @return A negative integer, zero, or a positive integer as this listener is less than, equal to,
48-
* or greater than the specified listener.
49-
*/
50-
override fun compareTo(other: Listener) =
51-
compareBy<Listener> {
42+
override fun compareTo(other: Listener<T>) =
43+
comparator.compare(this, other)
44+
45+
companion object {
46+
val comparator = compareBy<Listener<out Event>> {
5247
it.priority
5348
}.thenBy {
54-
// Needed because ConcurrentSkipListSet handles insertion based on compareTo
49+
// Hashcode is needed because ConcurrentSkipListSet handles insertion based on compareTo
5550
it.hashCode()
56-
}.compare(this, other)
57-
}
51+
}
52+
}
53+
}

0 commit comments

Comments
 (0)