Skip to content
137 changes: 119 additions & 18 deletions common/src/main/kotlin/com/lambda/event/EventFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,20 @@ import com.lambda.event.callback.ICancellable
import com.lambda.event.listener.Listener
import com.lambda.threading.runConcurrent
import com.lambda.threading.runSafe
import kotlinx.coroutines.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.filterNot
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout


/**
Expand All @@ -28,12 +39,35 @@ object EventFlow {
* useful when you have multiple independent [Job]s running in parallel.
*/
val lambdaScope = CoroutineScope(Dispatchers.Default + SupervisorJob())

/**
* [concurrentFlow] is a [MutableSharedFlow] of [Event]s with a buffer capacity to handle event emissions.
*
* Events emitted to this flow are processed by concurrent listeners, allowing for parallel event handling.
*
* The buffer overflow strategy is set to [BufferOverflow.DROP_OLDEST], meaning that when the buffer is full,
* the oldest event will be dropped to accommodate a new event.
*/
val concurrentFlow = MutableSharedFlow<Event>(
extraBufferCapacity = 1000,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)

/**
* [syncListeners] is a [Subscriber] that manages synchronous listeners.
*
* These listeners will be executed immediately when an event is posted, allowing for immediate responses to events.
* The [syncListeners] are stored in a [Subscriber] object, which is a specialized [ConcurrentHashMap] that manages sets of [Listener]s for different [Event] types.
*/
val syncListeners = Subscriber()

/**
* [concurrentListeners] is a [Subscriber] that manages asynchronous listeners.
*
* These listeners will be executed in parallel, each on a dedicated coroutine,
* allowing for concurrent processing of events.
* The [concurrentListeners] are stored in a [Subscriber] object, which is a specialized [ConcurrentHashMap] that manages sets of [Listener]s for different [Event] types.
*/
val concurrentListeners = Subscriber()

init {
Expand All @@ -49,28 +83,60 @@ object EventFlow {
}
}

suspend inline fun <reified E : Event> awaitEvent(
/**
* Suspends until an event of type [E] is received that satisfies the given [predicate].
*
* @param E The type of the event to wait for. This should be a subclass of [Event].
* @param predicate A lambda to test if the event satisfies the condition.
* @return The first event that matches the predicate.
*/
suspend inline fun <reified E : Event> blockUntilEvent(
noinline predicate: SafeContext.(E) -> Boolean = { true },
) = concurrentFlow.filterIsInstance<E>().first {
runSafe {
predicate(it)
} ?: false
}

suspend inline fun <reified E : Event> awaitEventUnsafe(
noinline predicate: (E) -> Boolean = { true },
) = concurrentFlow.filterIsInstance<E>().first(predicate)

suspend inline fun <reified E : Event> awaitEvent(
/**
* Suspends until an event of type [E] is received that satisfies the given [predicate],
* or until the specified [timeout] occurs.
*
* @param E The type of the event to wait for. This should be a subclass of [Event].
* @param timeout The maximum time to wait for the event, in milliseconds.
* @param predicate A lambda to test if the event satisfies the condition.
* @return The first event that matches the predicate or throws a timeout exception if not found.
*/
suspend inline fun <reified E : Event> blockUntilEvent(
timeout: Long,
noinline predicate: (E) -> Boolean = { true },
) = runBlocking {
withTimeout(timeout) {
concurrentFlow.filterIsInstance<E>().first(predicate)
}
withTimeout(timeout) {
concurrentFlow.filterIsInstance<E>().first(predicate)
}
}

suspend inline fun <reified E : Event> awaitEvents(
/**
* Suspends until an event of type [E] is received that satisfies the given [predicate].
*
* This method is "unsafe" in the sense that it does not execute the predicate within a [SafeContext].
*
* @param E The type of the event to wait for. This should be a subclass of [Event].
* @param predicate A lambda to test if the event satisfies the condition.
* @return The first event that matches the predicate.
*/
suspend inline fun <reified E : Event> blockUntilUnsafeEvent(
noinline predicate: (E) -> Boolean = { true },
) = concurrentFlow.filterIsInstance<E>().first(predicate)

/**
* Returns a [Flow] of events of type [E] that satisfy the given [predicate].
*
* @param E The type of the event to filter. This should be a subclass of [Event].
* @param predicate A lambda to test if the event satisfies the condition.
* @return A [Flow] emitting events that match the predicate.
*/
suspend inline fun <reified E : Event> collectEvents(
crossinline predicate: (E) -> Boolean = { true },
): Flow<E> = flow {
concurrentFlow
Expand Down Expand Up @@ -149,23 +215,58 @@ object EventFlow {
concurrentListeners.remove(T::class)
}

private fun Event.executeListenerSynchronous() {
syncListeners[this::class]?.forEach { listener ->
/**
* Executes the listeners for the current event type synchronously.
*
* This method retrieves the list of synchronous listeners for the event's class
* and invokes their [Listener.execute] method if the listener should be notified.
*
* @receiver The current event for which listeners are to be executed.
* @param T The type of the event being handled.
*/
private fun <T : Event> T.executeListenerSynchronous() {
syncListeners[this::class]?.forEach {
@Suppress("UNCHECKED_CAST")
val listener = it as? Listener<T> ?: return@forEach
if (shouldNotNotify(listener, this)) return@forEach
listener.execute(this)
}
}

private fun Event.executeListenerConcurrently() {
concurrentListeners[this::class]?.forEach { listener ->
/**
* Executes the listeners for the current event type concurrently.
*
* This method retrieves the list of concurrent listeners for the event's class
* and invokes their [Listener.execute] method if the listener should be notified.
* Each listener is executed on the same coroutine scope.
*
* @receiver The current event for which listeners are to be executed.
* @param T The type of the event being handled.
*/
private fun <T : Event> T.executeListenerConcurrently() {
concurrentListeners[this::class]?.forEach {
@Suppress("UNCHECKED_CAST")
val listener = it as? Listener<T> ?: return@forEach
if (shouldNotNotify(listener, this)) return@forEach
listener.execute(this)
}
}

private fun shouldNotNotify(listener: Listener, event: Event) =
/**
* Determines whether a given [listener] should be notified about an [event].
*
* A listener should not be notified if:
* - The listener's owner is a [Muteable] and is currently muted, unless the listener is set to [alwaysListen].
* - The event is cancellable and has been canceled.
*
* @param listener The listener to check.
* @param event The event being processed.
* @param T The type of the event.
* @return `true` if the listener should not be notified, `false` otherwise.
*/
private fun <T : Event> shouldNotNotify(listener: Listener<T>, event: Event) =
listener.owner is Muteable
&& (listener.owner as Muteable).isMuted
&& !listener.alwaysListen
|| event is ICancellable && event.isCanceled()
}
}
34 changes: 15 additions & 19 deletions common/src/main/kotlin/com/lambda/event/Subscriber.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,34 @@ import kotlin.reflect.KClass
*
* @property defaultListenerSet A [ConcurrentSkipListSet] of [Listener]s, sorted in reverse order.
*/
class Subscriber : ConcurrentHashMap<KClass<*>, ConcurrentSkipListSet<Listener>>() {
val defaultListenerSet: ConcurrentSkipListSet<Listener>
get() = ConcurrentSkipListSet(Comparator.reverseOrder())
class Subscriber : ConcurrentHashMap<KClass<out Event>, ConcurrentSkipListSet<Listener<out Event>>>() {
val defaultListenerSet: ConcurrentSkipListSet<Listener<out Event>>
get() = ConcurrentSkipListSet(Listener.comparator.reversed())


/** Allows a [Listener] to start receiving a specific type of [Event] */
inline fun <reified T : Event> subscribe(listener: Listener) =
inline fun <reified T : Event> subscribe(listener: Listener<T>) =
getOrPut(T::class) { defaultListenerSet }.add(listener)


/** Forgets about every [Listener]s association to [eventType] */
fun unsubscribe(eventType: KClass<*>) = remove(eventType)

/** Allows a [Listener] to stop receiving a specific type of [Event] */
fun unsubscribe(listener: Listener) {
values.forEach { listeners ->
listeners.remove(listener)
}
}

/** Allows a [Subscriber] to start receiving all [Event]s of another [Subscriber]. */
infix fun subscribe(subscriber: Subscriber) {
subscriber.forEach { (eventType, listeners) ->
getOrPut(eventType) { defaultListenerSet }.addAll(listeners)
}
}

/** Forgets about every [Listener]'s association to [eventType] */
fun <T : Event> unsubscribe(eventType: KClass<T>) =
remove(eventType)

/** Allows a [Listener] to stop receiving a specific type of [Event] */
inline fun <reified T : Event> unsubscribe(listener: Listener<T>) =
getOrElse(T::class) { defaultListenerSet }.remove(listener)

/** Allows a [Subscriber] to stop receiving all [Event]s of another [Subscriber] */
infix fun unsubscribe(subscriber: Subscriber) {
entries.removeAll { (eventType, listeners) ->
subscriber[eventType]?.let { listeners.removeAll(it) }
listeners.isEmpty()
subscriber.forEach { (eventType, listeners) ->
getOrElse(eventType) { defaultListenerSet }.removeAll(listeners)
}
}
}
}
26 changes: 11 additions & 15 deletions common/src/main/kotlin/com/lambda/event/listener/Listener.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import com.lambda.module.Module
* @property owner The owner of the [Listener]. This is typically the object that created the [Listener].
* @property alwaysListen If true, the [Listener] will always be triggered, even if the [owner] is [Muteable.isMuted].
*/
abstract class Listener : Comparable<Listener> {
abstract class Listener<T : Event> : Comparable<Listener<T>> {
abstract val priority: Int
abstract val owner: Any
abstract val alwaysListen: Boolean
Expand All @@ -37,21 +37,17 @@ abstract class Listener : Comparable<Listener> {
*
* @param event The event that triggered this listener.
*/
abstract fun execute(event: Event)
abstract fun execute(event: T)

/**
* Compares this listener with another listener.
* The comparison is based first on the priority, and then on the hash code of the listeners.
*
* @param other The other listener to compare with.
* @return A negative integer, zero, or a positive integer as this listener is less than, equal to,
* or greater than the specified listener.
*/
override fun compareTo(other: Listener) =
compareBy<Listener> {
override fun compareTo(other: Listener<T>) =
comparator.compare(this, other)

companion object {
val comparator = compareBy<Listener<out Event>> {
it.priority
}.thenBy {
// Needed because ConcurrentSkipListSet handles insertion based on compareTo
// Hashcode is needed because ConcurrentSkipListSet handles insertion based on compareTo
it.hashCode()
}.compare(this, other)
}
}
}
}
Loading