Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 16 additions & 68 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@ import sbt.{Def, Global, Tags}

import scala.collection.immutable.SortedSet

val benchmarkProjects = List(
"benchmarksPrev",
"benchmarksNext"
).map(_ + "/compile").mkString(" ;")

val jvmTests = List(
"reactiveTests",
"tracingTests"
Expand All @@ -18,15 +13,15 @@ val jvmTests = List(
addCommandAlias("ci-all", ";ci-jvm ;ci-js ;ci-meta")
addCommandAlias("ci-js", ";clean ;coreJS/Test/compile ;coreJS/test ;coreJS/package")
addCommandAlias("ci-jvm", ";clean ;coreJVM/Test/compile ;coreJVM/test ;coreJVM/package ;tracingTests/test")
addCommandAlias("ci-meta", ";mimaReportBinaryIssues ;unidoc")
addCommandAlias("ci-meta", ";unidoc")
addCommandAlias("ci-release", ";+publishSigned ;sonatypeBundleRelease")

// ------------------------------------------------------------------------------------------------
// Dependencies - Versions

val cats_Version = "2.7.0"
val catsEffect_Version = "2.5.5"
val fs2_Version = "2.5.11"
val cats_Version = "2.12.0"
val catsEffect_Version = "3.5.7"
val fs2_Version = "3.11.0"
val jcTools_Version = "4.0.5"
val reactiveStreams_Version = "1.0.4"
val macrotaskExecutor_Version = "1.0.0"
Expand All @@ -39,7 +34,7 @@ val scalaCompat_Version = "2.7.0"

// The Monix version with which we must keep binary compatibility.
// https://github.com/typesafehub/migration-manager/wiki/Sbt-plugin
val monixSeries = "3.4.0"
val monixSeries = "4.0.0"

// ------------------------------------------------------------------------------------------------
// Dependencies - Libraries
Expand Down Expand Up @@ -387,18 +382,17 @@ lazy val sharedJSSettings = Seq(
)

def mimaSettings(projectName: String) = Seq(
mimaPreviousArtifacts := Set("io.monix" %% projectName % monixSeries),
mimaBinaryIssueFilters ++= MimaFilters.changesFor_3_0_1,
mimaBinaryIssueFilters ++= MimaFilters.changesFor_3_2_0,
mimaBinaryIssueFilters ++= MimaFilters.changesFor_3_3_0,
mimaBinaryIssueFilters ++= MimaFilters.changesFor_3_4_0,
mimaBinaryIssueFilters ++= MimaFilters.changesFor_avs
mimaPreviousArtifacts := Set.empty,
mimaBinaryIssueFilters := Seq.empty
)

lazy val doctestTestSettings = Seq(
doctestTestFramework := DoctestTestFramework.Minitest,
doctestIgnoreRegex := Some(s".*TaskApp.scala|.*reactive.internal.(builders|operators|rstreams).*"),
doctestOnlyCodeBlocksMode := true
doctestOnlyCodeBlocksMode := true,
// Disable doctest generation — scaladoc examples reference CE2 APIs (Timer, ContextShift, etc.)
// that no longer exist in Cats Effect 3. Re-enable after updating the scaladoc examples.
doctestGenTests := Seq.empty
)

// ------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -597,6 +591,7 @@ lazy val executionJS = project
lazy val catnapProfile =
crossModule(
projectName = "monix-catnap",
withDocTests = false,
crossSettings = Seq(
description := "Sub-module of Monix, exposing pure abstractions built on top of the Cats-Effect type classes. See: https://monix.io",
libraryDependencies += catsEffectLib.value
Expand Down Expand Up @@ -710,7 +705,8 @@ lazy val reactiveTests = project
.dependsOn(reactiveJVM, tailJVM)
.settings(
libraryDependencies ++= Seq(
reactiveStreamsTCKLib % Test
reactiveStreamsTCKLib % Test,
"org.scalatestplus" %% "testng-7-5" % "3.2.17.0" % Test
)
)

Expand Down Expand Up @@ -749,53 +745,5 @@ lazy val tracingTests = project
)

// --------------------------------------------
// monix-benchmarks-{prev,next} (not published)

lazy val benchmarksScalaVersions =
Def.setting {
crossScalaVersionsFromBuildYaml.value.toIndexedSeq
.filter(v => !v.value.startsWith("3."))
.map(_.value)
}

lazy val benchmarksPrev = project
.in(file("benchmarks/vprev"))
.enablePlugins(JmhPlugin)
.configure(
monixSubModule(
"monix-benchmarks-prev",
publishArtifacts = false
)
)
.settings(
// Disable Scala 3 (Dotty)
scalaVersion := benchmarksScalaVersions.value.head,
crossScalaVersions := benchmarksScalaVersions.value,
libraryDependencies ++= Seq(
"io.monix" %% "monix" % "3.3.0",
"dev.zio" %% "zio-streams" % "1.0.0",
"co.fs2" %% "fs2-core" % fs2_Version,
"com.typesafe.akka" %% "akka-stream" % "2.6.9"
)
)

lazy val benchmarksNext = project
.in(file("benchmarks/vnext"))
.enablePlugins(JmhPlugin)
.configure(
monixSubModule(
projectName = "monix-benchmarks-next",
publishArtifacts = false
)
)
.dependsOn(reactiveJVM, tailJVM)
.settings(
// Disable Scala 3 (Dotty)
scalaVersion := benchmarksScalaVersions.value.head,
crossScalaVersions := benchmarksScalaVersions.value,
libraryDependencies ++= Seq(
"dev.zio" %% "zio-streams" % "1.0.0",
"co.fs2" %% "fs2-core" % fs2_Version,
"com.typesafe.akka" %% "akka-stream" % "2.6.9"
)
)
// Benchmarks are currently disabled during the CE3 migration.
// See benchmarks/ directory for sources.
Original file line number Diff line number Diff line change
Expand Up @@ -20,66 +20,33 @@ package internal

import java.util.concurrent.{CancellationException, CompletableFuture, CompletionException}
import java.util.function.BiFunction
import cats.effect.{Async, Concurrent}
import cats.effect.Async

private[catnap] abstract class FutureLiftForPlatform {
/**
* Lifts Java's `java.util.concurrent.CompletableFuture` to
* any data type implementing `cats.effect.Concurrent`.
*/
def javaCompletableToConcurrent[F[_], A](fa: F[CompletableFuture[A]])(implicit F: Concurrent[F]): F[A] =
F.flatMap(fa) { cf =>
F.cancelable { cb =>
subscribeToCompletable(cf, cb)
F.delay { cf.cancel(true); () }
}
}

/**
* Lifts Java's `java.util.concurrent.CompletableFuture` to
* any data type implementing `cats.effect.Async`.
*
* The resulting effect is cancelable if the underlying `CompletableFuture` supports it.
*/
def javaCompletableToAsync[F[_], A](fa: F[CompletableFuture[A]])(implicit F: Async[F]): F[A] =
F.flatMap(fa) { cf =>
F.async { cb =>
subscribeToCompletable(cf, cb)
F.pure(Some(F.delay { cf.cancel(true); () }))
}
}

/**
* A generic function that subsumes both [[javaCompletableToConcurrent]]
* and [[javaCompletableToAsync]].
*/
def javaCompletableToConcurrentOrAsync[F[_], A](fa: F[CompletableFuture[A]])(
implicit F: Concurrent[F] OrElse Async[F]): F[A] = {

F.unify match {
case ref: Concurrent[F] @unchecked => javaCompletableToConcurrent(fa)(ref)
case ref => javaCompletableToAsync(fa)(ref)
}
}

/**
* Implicit instance of [[FutureLift]] for converting from
* `java.util.concurrent.CompletableFuture` to any `Concurrent`
* or `Async` data type.
* `java.util.concurrent.CompletableFuture` to any `Async` data type.
*/
implicit def javaCompletableLiftForConcurrentOrAsync[F[_]](
implicit F: Concurrent[F] OrElse Async[F]): FutureLift[F, CompletableFuture] = {

F.unify match {
case ref: Concurrent[F] @unchecked =>
new FutureLift[F, CompletableFuture] {
def apply[A](fa: F[CompletableFuture[A]]): F[A] =
javaCompletableToConcurrent(fa)(ref)
}
case ref =>
new FutureLift[F, CompletableFuture] {
def apply[A](fa: F[CompletableFuture[A]]): F[A] =
javaCompletableToAsync(fa)(ref)
}
implicit def javaCompletableLiftForAsync[F[_]](
implicit F: Async[F]): FutureLift[F, CompletableFuture] =
new FutureLift[F, CompletableFuture] {
def apply[A](fa: F[CompletableFuture[A]]): F[A] =
javaCompletableToAsync(fa)
}
}

private def subscribeToCompletable[A, F[_]](cf: CompletableFuture[A], cb: Either[Throwable, A] => Unit): Unit = {
cf.handle[Unit](new BiFunction[A, Throwable, Unit] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,76 +17,58 @@

package monix.catnap

import java.util.concurrent.Executors
import minitest.SimpleTestSuite
import cats.effect.IO
import cats.effect.unsafe.implicits.global
import cats.implicits._
import monix.execution.atomic.Atomic
import scala.concurrent.{CancellationException, ExecutionContext}
import scala.concurrent.CancellationException
import scala.concurrent.duration._

object CatsEffectIssue380Suite extends SimpleTestSuite {
test("MVar does not block on put — typelevel/cats-effect#380") {
val service = Executors.newSingleThreadScheduledExecutor()
implicit val ec = ExecutionContext.global
implicit val cs = IO.contextShift(ec)
implicit val timer = IO.timer(ec, service)

try {
for (_ <- 0 until 10) {
val cancelLoop = Atomic(false)
val unit = IO {
if (cancelLoop.get()) throw new CancellationException
}
for (_ <- 0 until 10) {
val cancelLoop = Atomic(false)
val unit = IO {
if (cancelLoop.get()) throw new CancellationException
}

try {
val task = for {
mv <- MVar[IO].empty[Unit]()
_ <- (mv.take *> unit.foreverM).start
_ <- timer.sleep(100.millis)
_ <- mv.put(())
} yield ()
try {
val task = for {
mv <- MVar[IO].empty[Unit]()
_ <- (mv.take *> unit.foreverM).start
_ <- IO.sleep(100.millis)
_ <- mv.put(())
} yield ()

val dt = 10.seconds
assert(task.unsafeRunTimed(dt).nonEmpty, s"timed-out after $dt")
} finally {
cancelLoop := true
}
val dt = 10.seconds
assert(task.unsafeRunTimed(dt).nonEmpty, s"timed-out after $dt")
} finally {
cancelLoop := true
}
} finally {
service.shutdown()
}
}

test("Semaphore does not block on release — typelevel/cats-effect#380") {
val service = Executors.newSingleThreadScheduledExecutor()
implicit val ec = ExecutionContext.global
implicit val cs = IO.contextShift(ec)
implicit val timer = IO.timer(ec, service)

try {
for (_ <- 0 until 10) {
val cancelLoop = Atomic(false)
val unit = IO {
if (cancelLoop.get()) throw new CancellationException
}
for (_ <- 0 until 10) {
val cancelLoop = Atomic(false)
val unit = IO {
if (cancelLoop.get()) throw new CancellationException
}

try {
val task = for {
mv <- Semaphore[IO](0)
_ <- (mv.acquire *> unit.foreverM).start
_ <- timer.sleep(100.millis)
_ <- mv.release
} yield ()
try {
val task = for {
mv <- Semaphore[IO](0)
_ <- (mv.acquire *> unit.foreverM).start
_ <- IO.sleep(100.millis)
_ <- mv.release
} yield ()

val dt = 10.seconds
assert(task.unsafeRunTimed(dt).nonEmpty, s"timed-out after $dt")
} finally {
cancelLoop := true
}
val dt = 10.seconds
assert(task.unsafeRunTimed(dt).nonEmpty, s"timed-out after $dt")
} finally {
cancelLoop := true
}
} finally {
service.shutdown()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package monix.catnap

import cats.effect.IO
import cats.effect.unsafe.implicits.global
import monix.execution.BufferCapacity.Bounded
import monix.execution.{BufferCapacity, Scheduler}
import monix.execution.schedulers.SchedulerService
Expand All @@ -42,8 +43,10 @@ abstract class ConcurrentChannelJVMSuite(parallelism: Int) extends BaseConcurren
if (n > 0) test.flatMap(_ => repeatTest(test, n - 1))
else IO.unit

testAsync(name) { implicit ec =>
repeatTest(f(ec).timeout(taskTimeout), times).unsafeToFuture()
test(name) { implicit ec =>
val overallTimeout = taskTimeout + 10.seconds
val result = repeatTest(f(ec).timeout(taskTimeout), times).unsafeRunTimed(overallTimeout)
assert(result.nonEmpty, s"; timed-out after $overallTimeout")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package monix.catnap

import cats.effect.IO
import cats.effect.unsafe.implicits.global
import monix.execution.Scheduler
import monix.execution.schedulers.SchedulerService
import scala.concurrent.duration._
Expand All @@ -35,13 +36,17 @@ abstract class ConcurrentQueueJVMSuite(parallelism: Int) extends BaseConcurrentQ
assert(env.awaitTermination(30.seconds), "env.awaitTermination")
}

val taskTimeout = 60.seconds

def testIO(name: String, times: Int = 1)(f: Scheduler => IO[Unit]): Unit = {
def repeatTest(test: IO[Unit], n: Int): IO[Unit] =
if (n > 0) test.flatMap(_ => repeatTest(test, n - 1))
else IO.unit

testAsync(name) { implicit ec =>
repeatTest(f(ec).timeout(60.second), times).unsafeToFuture()
test(name) { implicit ec =>
val overallTimeout = taskTimeout + 10.seconds
val result = repeatTest(f(ec).timeout(taskTimeout), times).unsafeRunTimed(overallTimeout)
assert(result.nonEmpty, s"; timed-out after $overallTimeout")
}
}
}
Expand Down
Loading
Loading