Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions common/src/main/kotlin/com/lambda/event/EventFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,32 @@ object EventFlow {
}
}

/**
* Registers a timer that invokes the specified block at regular intervals
*
* This function runs a timer that emits a signal at the specified interval, and invokes the
* provided block each time the signal is emitted. The interval is specified in milliseconds,
* and the block is a suspend function that is called each time the timer triggers
*
* **Note**: This function uses a flow to emit signals at the specified interval, and uses `conflate()`
* to prevent overloading the block execution in case the block takes longer than the interval
*
* @param interval The time interval (in milliseconds) between each invocation of the block
* @param block The suspend function that will be called each time the timer triggers
*
* @return A [Job] representing the running timer. The timer can be canceled by invoking `cancel()` on the returned job
*/
inline fun timer(interval: Long, crossinline block: suspend () -> Unit) =
runConcurrent {
// This allows for 18,446,744,073,709,551,615 events
// Don't worry, a 10 ms timer will end after 5.849 billion years
(Long.MAX_VALUE downTo Long.MIN_VALUE)
.asFlow()
.onEach { delay(interval) }
.conflate()
.collect { block() }
}

/**
* Suspends until an event of type [E] is received that satisfies the given [predicate].
*
Expand Down Expand Up @@ -123,7 +149,7 @@ object EventFlow {
noinline predicate: (E) -> Boolean = { true },
) = runBlocking {
withTimeout(timeout) {
concurrentFlow.filterIsInstance<E>().first(predicate)
this@EventFlow.concurrentFlow.filterIsInstance<E>().first(predicate)
}
}

Expand All @@ -150,7 +176,7 @@ object EventFlow {
suspend inline fun <reified E : Event> collectEvents(
crossinline predicate: (E) -> Boolean = { true },
): Flow<E> = flow {
concurrentFlow
this@EventFlow.concurrentFlow
.filterIsInstance<E>()
.filter { predicate(it) }
.collect {
Expand All @@ -175,7 +201,7 @@ object EventFlow {
*/
@JvmStatic
fun <E : Event> E.post(): E {
concurrentFlow.tryEmit(this)
this@EventFlow.concurrentFlow.tryEmit(this)
executeListenerSynchronous()
return this@post
}
Expand Down
Loading