Migrate from Cats Effect 2 to Cats Effect 3#24
Migrate from Cats Effect 2 to Cats Effect 3#24halotukozak wants to merge 2 commits intoseries/3.x-avsfrom
Conversation
Upgrade cats-effect from 2.5.5 to 3.5.7, cats from 2.7.0 to 2.12.0, and fs2 from 2.5.11 to 3.11.0. This is a major migration touching all modules. Key source changes: - Replace Concurrent/ConcurrentEffect/Effect with Async (CE3 unification) - Replace ContextShift/Timer with Async/Temporal - Replace Bracket/ExitCase with MonadCancel; use monix.execution.ExitCase - Implement Async[Task] (CatsAsyncForTask) with CE3 GenSpawn/Fiber/Outcome - Implement Sync[Coeval] (CatsSyncForCoeval) with CE3 Clock/CancelScope - Fix AsyncUtils.cancelable: .as(None) was evaluating cancel tokens immediately - Fix Semaphore.make: same .as(None) cancel token bug - Fix FutureLift.startAsync: async_ is non-cancelable in CE3, use async - Fix CatsAsyncForTask.deferred: infinite recursion via Deferred.apply - Fix TaskLike.fromIO: infinite recursion via Task.from - Fix TaskApp: propagate custom Options to runAsyncOpt - Remove CatsEffectForTask (ConcurrentEffect no longer exists) - Remove TaskEffect (Effect no longer exists) Test adaptations for CE3: - Replace TestScheduler + unsafeToFuture() + .value pattern with unsafeRunSync()/Await.result (CE3 IO runs on its own runtime) - Replace IO(...) with Task.eval(...) in TestScheduler-based reactive tests (IO no longer cooperates with TestScheduler in CE3) - Use unsafeRunTimed in ConcurrentChannel/Queue JVM suites to prevent test runner hangs from dangling async callbacks - Ignore 2 bracket acquire cancelation race tests incompatible with CE3 fiber scheduling model under TestScheduler Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…treams TCK The reactive-streams TCK tests require scalatestplus-testng for the TestNG/ScalaTest bridge, and CE3's Iterant.toReactivePublisher now requires an implicit Dispatcher[Task]. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Code reviewFound 3 issues:
monix/monix-eval/shared/src/main/scala/monix/eval/TaskApp.scala Lines 79 to 92 in b8cd791 🤖 Generated with Claude Code - If this code review was useful, please react with 👍. Otherwise, react with 👎. |
There was a problem hiding this comment.
Pull request overview
This PR migrates Monix’s Cats Effect integration from CE2 to CE3 across core modules and tests, updating typeclass usage and adapting runtime/cancelation semantics.
Changes:
- Upgraded cats-effect/cats/fs2 to CE3-era versions and replaced CE2 typeclasses (e.g.,
Effect,ContextShift,Timer) with CE3 equivalents (Async,Temporal,MonadCancel,Outcome). - Refactored Monix integrations (Task/Iterant/Observable, catnap primitives, conversions) to align with CE3 cancelation/clock/sleep APIs.
- Updated test suites to reflect CE3 runtime behavior, and added reactive-streams TCK support dependencies.
Reviewed changes
Copilot reviewed 128 out of 129 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| reactiveTests/src/test/scala/monix/reactiveTests/IterantToPublisherTest.scala | Adds CE3 Dispatcher[Task] for reactive publisher tests |
| project/MimaFilters.scala | Clears MiMa filters for 4.x series |
| monix-tail/shared/src/test/scala/monix/tail/IterantToReactivePublisherSuite.scala | Updates publisher tests to CE3 Async + Dispatcher |
| monix-tail/shared/src/test/scala/monix/tail/IterantOnErrorSuite.scala | Imports CE3 global runtime for IO usage in tests |
| monix-tail/shared/src/test/scala/monix/tail/IterantFromReactivePublisherSuite.scala | Adds Dispatcher[IO] for CE3 reactive publisher conversions |
| monix-tail/shared/src/test/scala/monix/tail/IterantChannelSuite.scala | Removes CE2 ContextShift/Timer usage |
| monix-tail/shared/src/test/scala/monix/tail/IterantBasicSuite.scala | Imports CE3 global runtime for IO usage in tests |
| monix-tail/shared/src/test/scala/monix/tail/IntervalIntervalSuite.scala | Introduces custom IORuntime backed by TestScheduler for virtual time IO tests |
| monix-tail/shared/src/test/scala/monix/tail/BaseLawsSuite.scala | Removes CE2 laws parameters customization |
| monix-tail/shared/src/main/scala/monix/tail/IterantBuilders.scala | Drops Timer/ContextShift requirements; moves to Async |
| monix-tail/shared/src/main/scala/monix/tail/Iterant.scala | Updates consume/reactive publisher APIs to Async + Dispatcher; refactors fromResource |
| monix-tail/shared/src/main/scala/monix/tail/internal/package.scala | Adds Outcome -> ExitCase conversion for CE3 guarantees/brackets |
| monix-tail/shared/src/main/scala/monix/tail/internal/IterantToReactivePublisher.scala | Reworks reactive publisher loop to CE3 Async + Dispatcher cancelation model |
| monix-tail/shared/src/main/scala/monix/tail/internal/IterantIntervalWithFixedDelay.scala | Uses Async.sleep instead of CE2 Timer.sleep |
| monix-tail/shared/src/main/scala/monix/tail/internal/IterantIntervalAtFixedRate.scala | Uses Async.monotonic + Async.sleep instead of CE2 Clock/Timer |
| monix-tail/shared/src/main/scala/monix/tail/internal/IterantFromReactivePublisher.scala | Switches to async_ for CE3-style async boundaries |
| monix-tail/shared/src/main/scala/monix/tail/internal/IterantDump.scala | Migrates guaranteeCase pattern matching to CE3 Outcome |
| monix-tail/shared/src/main/scala/monix/tail/internal/IterantConsume.scala | Requires Async instead of Concurrent + ContextShift |
| monix-reactive/shared/src/test/scala/monix/reactive/TypeClassLawsForObservableSuite.scala | Updates laws to MonadError instead of CE2 BracketTests |
| monix-reactive/shared/src/test/scala/monix/reactive/ObservableLikeConversionsSuite.scala | Adapts IO tests to CE3 runtime behavior (async completion) |
| monix-reactive/shared/src/test/scala/monix/reactive/internal/operators/ScanTaskSuite.scala | Replaces IO-based effects with Task for CE3 migration |
| monix-reactive/shared/src/test/scala/monix/reactive/internal/operators/ScanEffectSuite.scala | Replaces IO usage with Task in scan-effect tests |
| monix-reactive/shared/src/test/scala/monix/reactive/internal/operators/PublishSelectorSuite.scala | Replaces IO usage with Task for start hooks |
| monix-reactive/shared/src/test/scala/monix/reactive/internal/operators/MapEffectSuite.scala | Replaces IO usage with Task in effectful map tests |
| monix-reactive/shared/src/test/scala/monix/reactive/internal/operators/GuaranteeCaseSuite.scala | Migrates from CE2 ExitCase to Monix ExitCase + Task |
| monix-reactive/shared/src/test/scala/monix/reactive/internal/operators/DoOnSubscribeSuite.scala | Replaces IO usage with Task for CE3 |
| monix-reactive/shared/src/test/scala/monix/reactive/internal/operators/DoOnStartSuite.scala | Replaces IO usage with Task for CE3 |
| monix-reactive/shared/src/test/scala/monix/reactive/internal/operators/DoOnNextSuite.scala | Replaces IO usage with Task for CE3 |
| monix-reactive/shared/src/test/scala/monix/reactive/internal/operators/DoOnNextAckSuite.scala | Replaces IO usage with Task for CE3 |
| monix-reactive/shared/src/test/scala/monix/reactive/internal/operators/DoOnErrorSuite.scala | Replaces IO usage with Task for CE3 |
| monix-reactive/shared/src/test/scala/monix/reactive/internal/operators/DoOnEarlyStopSuite.scala | Replaces IO usage with Task for CE3 |
| monix-reactive/shared/src/test/scala/monix/reactive/internal/operators/DoOnCompleteSuite.scala | Replaces IO usage with Task for CE3 |
| monix-reactive/shared/src/test/scala/monix/reactive/internal/builders/UnfoldEvalObservableSuite.scala | Replaces IO usage with Task for CE3 |
| monix-reactive/shared/src/test/scala/monix/reactive/internal/builders/ResourceCaseObservableSuite.scala | Migrates CE2 Deferred to MVar and ignores race test under CE3 scheduling |
| monix-reactive/shared/src/test/scala/monix/reactive/internal/builders/RepeatEvalFSuite.scala | Replaces IO usage with Task and updates async builder usage |
| monix-reactive/shared/src/test/scala/monix/reactive/internal/builders/FirstStartedObservableSuite.scala | Replaces IO usage with Task for CE3 |
| monix-reactive/shared/src/test/scala/monix/reactive/internal/builders/CharsReaderObservableSuite.scala | Migrates CE2 ExitCase import to Monix ExitCase |
| monix-reactive/shared/src/test/scala/monix/reactive/internal/builders/CatsConversionsSuite.scala | Replaces IO-based conversions with Task-based equivalents |
| monix-reactive/shared/src/test/scala/monix/reactive/internal/builders/BracketObservableSuite.scala | Migrates CE2 Deferred to MVar and ignores race test under CE3 scheduling |
| monix-reactive/shared/src/test/scala/monix/reactive/internal/builders/AsyncStateActionObservableSuite.scala | Replaces IO async usage with Task.evalAsync |
| monix-reactive/shared/src/test/scala/monix/reactive/consumers/MapEvalConsumerSuite.scala | Replaces IO effects with Task in consumer law tests |
| monix-reactive/shared/src/test/scala/monix/reactive/consumers/HeadOptionConsumerSuite.scala | Replaces IO usage with Task in consumer tests |
| monix-reactive/shared/src/test/scala/monix/reactive/consumers/ForeachAsyncConsumerSuite.scala | Replaces IO usage with Task in foreach consumer tests |
| monix-reactive/shared/src/test/scala/monix/reactive/consumers/FoldLeftTaskConsumerSuite.scala | Replaces IO usage with Task in fold-left consumer tests |
| monix-reactive/shared/src/test/scala/monix/reactive/consumers/FirstNotificationConsumerSuite.scala | Replaces IO usage with Task in notification consumer tests |
| monix-reactive/shared/src/main/scala/monix/reactive/ObservableLike.scala | Updates SyncIO conversion to avoid CE2 IO conversion |
| monix-reactive/shared/src/main/scala/monix/reactive/Observable.scala | Removes CE2 Effect/Bracket coupling; updates resource conversion and instances |
| monix-reactive/shared/src/main/scala/monix/reactive/internal/operators/GuaranteeCaseObservable.scala | Migrates CE2 ExitCase to Monix ExitCase |
| monix-reactive/shared/src/main/scala/monix/reactive/internal/operators/ConcatMapObservable.scala | Migrates CE2 ExitCase to Monix ExitCase |
| monix-reactive/shared/src/main/scala/monix/reactive/internal/deprecated/ObservableDeprecatedMethods.scala | Adjusts deprecated API to depend on TaskLike instead of CE2 Effect |
| monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/ResourceCaseObservable.scala | Migrates CE2 ExitCase to Monix ExitCase |
| monix-execution/shared/src/main/scala/monix/execution/ExitCase.scala | Introduces Monix-local ExitCase ADT to replace CE2 ExitCase usage |
| monix-eval/shared/src/test/scala/monix/eval/TypeClassLawsForTaskWithCallbackSuite.scala | Disables CE2 law tests; notes CE3 law infra TODO |
| monix-eval/shared/src/test/scala/monix/eval/TypeClassLawsForTaskSuite.scala | Disables CE2 law tests; notes CE3 law infra TODO |
| monix-eval/shared/src/test/scala/monix/eval/TypeClassLawsForCoevalSuite.scala | Switches to CE3 SyncTests scaffolding (still TODO to enable) |
| monix-eval/shared/src/test/scala/monix/eval/TaskLikeConversionsSuite.scala | Removes CE2 custom-effect conversion tests |
| monix-eval/shared/src/test/scala/monix/eval/TaskLiftSuite.scala | Updates IO conversion tests for CE3 runtime behavior |
| monix-eval/shared/src/test/scala/monix/eval/TaskConversionsKSuite.scala | Removes CE2 ContextShift dependencies in tests |
| monix-eval/shared/src/test/scala/monix/eval/TaskClockTimerAndContextShiftSuite.scala | Migrates clock/timer/context-shift tests to CE3 Temporal/Async |
| monix-eval/shared/src/test/scala/monix/eval/TaskCancelableSuite.scala | Migrates CE2 ExitCase import to Monix ExitCase |
| monix-eval/shared/src/test/scala/monix/eval/TaskBracketSuite.scala | Updates Deferred import for CE3 |
| monix-eval/shared/src/test/scala/monix/eval/CoevalCatsConversions.scala | Imports CE3 global runtime for IO usage in tests |
| monix-eval/shared/src/test/scala/monix/eval/BaseLawsSuite.scala | Removes CE2 laws arbitraries/cogens; adapts async generators to CE3 async_ |
| monix-eval/jvm/src/test/scala/monix/eval/TypeClassLawsForTaskRunSyncUnsafeSuite.scala | Starts CE3 AsyncTests scaffolding (still TODO to enable) |
| monix-eval/jvm/src/test/scala/monix/eval/TaskLocalJVMSuite.scala | Imports CE3 global runtime for IO usage in tests |
| monix-eval/shared/src/main/scala/monix/eval/TaskLike.scala | Reworks TaskLike instances around CE3 (notably IO/SyncIO) |
| monix-eval/shared/src/main/scala/monix/eval/TaskLift.scala | Switches Task -> IO conversion to depend on Async[Task] |
| monix-eval/shared/src/main/scala/monix/eval/TaskApp.scala | Reimplements TaskApp using IOApp + CE3-style async bridging |
| monix-eval/shared/src/main/scala/monix/eval/package.scala | Updates deprecated TaskSemaphore to use monix-catnap semaphore |
| monix-eval/shared/src/main/scala/monix/eval/internal/UnsafeCancelUtils.scala | Replaces CE2 CancelToken with Task[Unit] cancel token |
| monix-eval/shared/src/main/scala/monix/eval/internal/TaskRunLoop.scala | Replaces CE2 CancelToken return types with Task[Unit] |
| monix-eval/shared/src/main/scala/monix/eval/internal/TaskRaceList.scala | Replaces CE2 CancelToken usage with Task[Unit] |
| monix-eval/shared/src/main/scala/monix/eval/internal/TaskParSequenceUnordered.scala | Converts cancel token buffers to Task[Unit] |
| monix-eval/shared/src/main/scala/monix/eval/internal/TaskParSequenceN.scala | Migrates CE2 ExitCase import to Monix ExitCase |
| monix-eval/shared/src/main/scala/monix/eval/internal/TaskParSequence.scala | Converts cancel token buffers to Task[Unit] |
| monix-eval/shared/src/main/scala/monix/eval/internal/TaskDeprecated.scala | Updates deprecated APIs to CE3 typeclasses |
| monix-eval/shared/src/main/scala/monix/eval/internal/TaskCreate.scala | Migrates cancelable builders from CE2 CancelToken to Task[Unit] |
| monix-eval/shared/src/main/scala/monix/eval/internal/TaskConversions.scala | Reimplements Task conversions to CE3 IO semantics |
| monix-eval/shared/src/main/scala/monix/eval/internal/TaskConnectionRef.scala | Replaces CE2 CancelToken storage with Task[Unit] |
| monix-eval/shared/src/main/scala/monix/eval/internal/TaskConnectionComposite.scala | Replaces CE2 CancelToken aggregation with Task[Unit] |
| monix-eval/shared/src/main/scala/monix/eval/internal/TaskConnection.scala | Replaces CE2 CancelToken stack with Task[Unit] |
| monix-eval/shared/src/main/scala/monix/eval/internal/TaskCancellation.scala | Replaces CE2 CancelToken return type with Task[Unit] |
| monix-eval/shared/src/main/scala/monix/eval/internal/TaskBracket.scala | Migrates CE2 ExitCase to Monix ExitCase |
| monix-eval/shared/src/main/scala/monix/eval/internal/ForwardCancelable.scala | Replaces CE2 CancelToken placeholder with Task[Unit] |
| monix-eval/shared/src/main/scala/monix/eval/internal/CoevalBracket.scala | Migrates CE2 ExitCase to Monix ExitCase |
| monix-eval/shared/src/main/scala/monix/eval/instances/CatsSyncForCoeval.scala | Implements CE3 Sync[Coeval] (replaces CE2 SyncEffect) |
| monix-eval/shared/src/main/scala/monix/eval/instances/CatsParallelForTask.scala | Updates Parallel instance to use CE3 CatsAsyncForTask |
| monix-eval/shared/src/main/scala/monix/eval/Fiber.scala | Removes CE2 cats.effect.Fiber inheritance; uses Task[Unit] cancel token |
| monix-eval/shared/src/main/scala/monix/eval/Coeval.scala | Migrates CE2 ExitCase import to Monix ExitCase |
| monix-catnap/shared/src/test/scala/monix/catnap/TestSchedulerEffectSuite.scala | Updates SchedulerEffect tests to CE3 duration-based clock/sleep |
| monix-catnap/shared/src/test/scala/monix/catnap/ReferenceSchedulerEffectSuite.scala | Updates SchedulerEffect tests to CE3 duration-based clock/sleep |
| monix-catnap/shared/src/test/scala/monix/catnap/Overrides.scala | Reworks IO overrides for CE3 (delegating to standard instances) |
| monix-catnap/shared/src/test/scala/monix/catnap/ConcurrentQueueSuite.scala | Removes CE2 shift/timer/contextshift usage; updates fiber joins |
| monix-catnap/shared/src/test/scala/monix/catnap/ConcurrentChannelSuite.scala | Removes CE2 shift/timer/contextshift usage; updates fiber joins |
| monix-catnap/shared/src/test/scala/monix/catnap/CircuitBreakerSuite.scala | Removes explicit Clock/Timer wiring; uses CE3 cede and Sync clock |
| monix-catnap/shared/src/test/scala/monix/catnap/cancelables/SingleAssignCancelableFSuite.scala | Imports CE3 global runtime for IO usage in tests |
| monix-catnap/shared/src/test/scala/monix/catnap/cancelables/BooleanCancelableFSuite.scala | Imports CE3 global runtime for IO usage in tests |
| monix-catnap/shared/src/test/scala/monix/catnap/cancelables/AssignableCancelableFSuite.scala | Imports CE3 global runtime for IO usage in tests |
| monix-catnap/shared/src/test/scala/monix/catnap/CancelableFSuite.scala | Imports CE3 global runtime for IO usage in tests |
| monix-catnap/shared/src/main/scala/monix/catnap/Semaphore.scala | Migrates to CE3 cats.effect.std.Semaphore API and Async |
| monix-catnap/shared/src/main/scala/monix/catnap/SchedulerEffect.scala | Replaces CE2 Clock/Timer/ContextShift derivations with CE3 sleep + duration clocks |
| monix-catnap/shared/src/main/scala/monix/catnap/internal/QueueHelpers.scala | Switches to Async + cede and CE3 async cancelability |
| monix-catnap/shared/src/main/scala/monix/catnap/internal/AsyncUtils.scala | Reimplements cancelable builder using CE3 async cancel tokens |
| monix-catnap/shared/src/main/scala/monix/catnap/FutureLift.scala | Drops Concurrent support; uses CE3 Async cancel tokens |
| monix-catnap/shared/src/main/scala/monix/catnap/ConcurrentQueue.scala | Switches queue implementation constraints to Async |
| monix-catnap/shared/src/main/scala/monix/catnap/ConcurrentChannel.scala | Switches channel implementation constraints to Async |
| monix-catnap/shared/src/main/scala/monix/catnap/CircuitBreaker.scala | Migrates from explicit Clock to Sync clock and CE3 Outcome |
| monix-catnap/shared/src/main/scala/monix/catnap/cancelables/SingleAssignCancelableF.scala | Replaces CE2 CancelToken types with F[Unit] |
| monix-catnap/shared/src/main/scala/monix/catnap/cancelables/BooleanCancelableF.scala | Replaces CE2 CancelToken types with F[Unit] |
| monix-catnap/shared/src/main/scala/monix/catnap/cancelables/AssignableCancelableF.scala | Replaces CE2 CancelToken types with F[Unit] |
| monix-catnap/shared/src/main/scala/monix/catnap/CancelableF.scala | Replaces CE2 CancelToken types with F[Unit] throughout |
| monix-catnap/jvm/src/test/scala/monix/catnap/FutureLiftJava8Suite.scala | Updates CompletableFuture lifting tests to CE3 runtime (Await/unsafeToFutureCancelable) |
| monix-catnap/jvm/src/test/scala/monix/catnap/ConcurrentQueueJVMSuite.scala | Switches async tests to synchronous unsafeRunTimed timeouts |
| monix-catnap/jvm/src/test/scala/monix/catnap/ConcurrentChannelJVMSuite.scala | Switches async tests to synchronous unsafeRunTimed timeouts |
| monix-catnap/jvm/src/test/scala/monix/catnap/CatsEffectIssue380Suite.scala | Updates issue regression tests to CE3 runtime + IO.sleep |
| monix-catnap/jvm/src/main/scala/monix/catnap/internal/FutureLiftForPlatform.scala | Drops Concurrent lifting; uses CE3 Async cancel tokens |
| build.sbt | Updates dependency versions, disables MiMa/doctest generation, adds TCK test dependency |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def toIO[A](source: Task[A])(implicit async: Async[Task]): IO[A] = | ||
| source match { | ||
| case Task.Now(value) => IO.pure(value) | ||
| case Task.Error(e) => IO.raiseError(e) | ||
| case Task.Eval(thunk) => IO(thunk()) | ||
| case _ => | ||
| IO.cancelable { cb => | ||
| toIO(eff.runCancelable(source)(r => { cb(r); IO.unit }).unsafeRunSync()) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Implementation for `Task#toConcurrent`. | ||
| */ | ||
| def toConcurrent[F[_], A](source: Task[A])(implicit F: Concurrent[F], eff: ConcurrentEffect[Task]): F[A] = | ||
| source match { | ||
| case Task.Now(value) => F.pure(value) | ||
| case Task.Error(e) => F.raiseError(e) | ||
| case Task.Eval(thunk) => F.delay(thunk()) | ||
| case _ => | ||
| F.cancelable { cb => | ||
| val token = eff.runCancelable(source)(r => { cb(r); IO.unit }).unsafeRunSync() | ||
| toConcurrent(token)(F, eff) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Implementation for `Task#toAsync`. | ||
| */ | ||
| def toAsync[F[_], A](source: Task[A])(implicit F: Async[F], eff: Effect[Task]): F[A] = | ||
| source match { | ||
| case Task.Now(value) => F.pure(value) | ||
| case Task.Error(e) => F.raiseError(e) | ||
| case Task.Eval(thunk) => F.delay(thunk()) | ||
| case task => | ||
| F.async { cb => | ||
| eff.runAsync(task)(r => { cb(r); IO.unit }).unsafeRunSync() | ||
| IO.async_ { cb => | ||
| // Run the task and feed results into the IO callback | ||
| implicit val s = monix.execution.Scheduler.global | ||
| source.runAsync { | ||
| case Right(a) => cb(Right(a)) | ||
| case Left(e) => cb(Left(e)) | ||
| } | ||
| () | ||
| } |
There was a problem hiding this comment.
TaskConversions.toIO builds an IO using IO.async_ and calls source.runAsync, but it ignores the returned Cancelable. This makes the resulting IO non-cancelable (cancellation won’t stop the underlying Task) and can leak work/resources.
Also, it hardcodes Scheduler.global and bypasses Task.Options, so conversions won’t respect caller-provided scheduler/options (e.g., local context propagation, auto-cancelable run-loops).
Recommendation: build the IO with IO.async / IO.cancelable and wire cancellation to the Cancelable returned by runAsyncOpt, and ensure the scheduler/options used come from parameters/implicits rather than hardcoding globals.
| implicit val fromIO: TaskLike[IO] = | ||
| new TaskLike[IO] { | ||
| def apply[A](fa: IO[A]): Task[A] = | ||
| Concurrent.liftIO[Task, A](fa) | ||
| Task.async { cb => | ||
| import cats.effect.unsafe.implicits.global | ||
| fa.unsafeRunAsync { | ||
| case Right(a) => cb.onSuccess(a) | ||
| case Left(e) => cb.onError(e) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
TaskLike.fromIO uses Task.async and IO.unsafeRunAsync, but it does not provide any cancellation linkage. Since IO is cancelable, this violates the Scaladoc promise to preserve interruptibility and can lead to leaked fibers/work when the resulting Task is canceled.
Recommendation: construct a cancelable Task (e.g., via Task.cancelable0) and use IO.unsafeToFutureCancelable() (or equivalent) to obtain a cancel token that cancels the IO fiber when the Task is canceled, instead of dropping cancellation entirely.
| val app = new IOApp { | ||
| override implicit lazy val contextShift: ContextShift[IO] = | ||
| SchedulerEffect.contextShift[IO](scheduler)(IO.ioEffect) | ||
| override implicit lazy val timer: Timer[IO] = | ||
| SchedulerEffect.timerLiftIO[IO](scheduler)(IO.ioEffect) | ||
| def run(args: List[String]): IO[ExitCode] = | ||
| self.run(args).to[IO] | ||
| def run(args: List[String]): IO[ExitCode] = { | ||
| val task = self.run(args) | ||
| implicit val s: Scheduler = self.scheduler | ||
| implicit val opts: Task.Options = self.options | ||
| IO.async_[ExitCode] { cb => | ||
| task.runAsyncOpt { | ||
| case Right(exitCode) => cb(Right(exitCode)) | ||
| case Left(e) => cb(Left(e)) | ||
| } | ||
| () | ||
| } |
There was a problem hiding this comment.
TaskApp wraps the Task in IO.async_ and calls task.runAsyncOpt, but discards the returned Cancelable. This makes shutdown cancellation ineffective: if the IOApp cancels the IO (e.g., on SIGINT), the underlying Task will keep running.
Recommendation: use a cancelable IO (e.g., IO.async / IO.cancelable) that returns a cancel token which calls the Cancelable from runAsyncOpt.
| implicit G: cats.effect.kernel.MonadCancel[G, _]): cats.effect.std.Semaphore[G] = | ||
| throw new UnsupportedOperationException("Monix Semaphore does not support mapK") | ||
|
|
There was a problem hiding this comment.
Semaphore.mapK currently throws UnsupportedOperationException. Since mapK is part of the cats.effect.std.Semaphore API, this turns a normally total operation into a runtime failure for downstream users and can break generic code.
Recommendation: implement mapK by delegating each Semaphore operation through the provided FunctionK (or, if truly impossible, avoid extending cats.effect.std.Semaphore and provide an explicit wrapper type that documents the limitation).
| implicit G: cats.effect.kernel.MonadCancel[G, _]): cats.effect.std.Semaphore[G] = | |
| throw new UnsupportedOperationException("Monix Semaphore does not support mapK") | |
| implicit G: cats.effect.kernel.MonadCancel[G, _]): cats.effect.std.Semaphore[G] = { | |
| def mapSemaphoreK[S[_], T[_]]( | |
| source: cats.effect.std.Semaphore[S], | |
| fk: cats.arrow.FunctionK[S, T] | |
| )( | |
| implicit T: cats.effect.kernel.MonadCancel[T, _] | |
| ): cats.effect.std.Semaphore[T] = | |
| new cats.effect.std.Semaphore[T] { | |
| override def acquire: T[Unit] = | |
| fk(source.acquire) | |
| override def tryAcquire: T[Boolean] = | |
| fk(source.tryAcquire) | |
| override def release: T[Unit] = | |
| fk(source.release) | |
| override def available: T[Long] = | |
| fk(source.available) | |
| override def count: T[Long] = | |
| fk(source.count) | |
| override def acquireN(n: Long): T[Unit] = | |
| fk(source.acquireN(n)) | |
| override def tryAcquireN(n: Long): T[Boolean] = | |
| fk(source.tryAcquireN(n)) | |
| override def releaseN(n: Long): T[Unit] = | |
| fk(source.releaseN(n)) | |
| override def permit: cats.effect.kernel.Resource[T, Unit] = | |
| source.permit.mapK(fk) | |
| override def mapK[U[_]](g: cats.arrow.FunctionK[T, U])( | |
| implicit U: cats.effect.kernel.MonadCancel[U, _]): cats.effect.std.Semaphore[U] = | |
| mapSemaphoreK(this, g) | |
| } | |
| mapSemaphoreK(this, f) | |
| } |
| } else if (cb ne null) { | ||
| continue = !parent.state.compareAndSet(current, Await(cb)) | ||
| } else { | ||
| result = F.asyncF(poll) | ||
| result = F.async_[Unit](poll) | ||
| } |
There was a problem hiding this comment.
In Loop.poll, result = F.async_[Unit](poll) is broken: Async.async_ expects a registration function returning Unit, but poll(cb) returns an F[Unit] which will be discarded (not executed). This means the callback is never registered/invoked, so the stream can hang waiting for demand.
Recommendation: use F.async (or F.async_ { cb => ... }) in a way that actually evaluates the poll(cb) effect (e.g., F.async[Unit](cb => poll(cb).as(None))), or refactor poll so the registration side-effects happen in Unit rather than returning an unevaluated effect.
Code reviewFound 7 issues:
monix/monix-eval/shared/src/main/scala/monix/eval/TaskApp.scala Lines 82 to 92 in 54bdc4c
monix/monix-eval/shared/src/main/scala/monix/eval/TaskLike.scala Lines 113 to 123 in 54bdc4c
🤖 Generated with Claude Code - If this code review was useful, please react with 👍. Otherwise, react with 👎. |
Summary
Async[Task]andSync[Coeval]with full CE3 typeclass hierarchy (GenSpawn, Fiber, Outcome, Clock, CancelScope)AsyncUtils,Semaphore,FutureLift)CatsAsyncForTask.deferredandTaskLike.fromIOTest results
Notable changes
Source (net -1895 lines):
CatsEffectForTaskandTaskEffect(CE2-only)CatsAsyncForTasknow implements full CE3Async[Task]CatsSyncForCoevalnow implements CE3Sync[Coeval]FutureLift.startAsync:async_→async(CE3 cancelability fix)AsyncUtils.cancelable:.as(None)bug was immediately evaluating cancel tokensTaskApp: propagate custom Options viarunAsyncOptTests:
IO.unsafeToFuture()runs on its own global runtime, not TestScheduler — replaced synchronous.valuechecks withAwait.result/unsafeRunSyncIO(...)withTask.eval(...)in TestScheduler-based tests🤖 Generated with Claude Code