Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import pekko.remote.artery.tcp.TlsTcpSpec
import pekko.testkit.ImplicitSender
import pekko.testkit.TestActors
import pekko.testkit.TestProbe
import pekko.util.JavaVersion

import com.typesafe.config.ConfigFactory

Expand Down Expand Up @@ -136,13 +137,16 @@ class RotatingProviderWithChangingKeysSpec
deployKeySet("ssl/rsa-client.example.com")
awaitCacheExpiration()
val (_, pathEchoC) = buildRemoteWithEchoActor("C-reread")
try {
contact(remoteSysB.actorSystem, pathEchoC)
fail("The credentials under `ssl/rsa-client` are not valid for Pekko remote so contact() must fail.")
} catch {
case _: java.lang.AssertionError =>
// This assertion error is expected because we expect a failure in contact() since
// the SSL credentials are invalid

if (JavaVersion.majorVersion >= 25) {
// JDK 25+ strictly validates X.509 Extended Key Usage (EKU) constraints
// during TLS handshake. The client-only certificate is rejected immediately
// for server authentication, so we verify via the actor identification protocol.
verifyTlsRejectedByEkuValidation(remoteSysB.actorSystem, pathEchoC)
} else {
// On older JDKs, the TLS handshake with invalid certificates fails mid-exchange,
// causing the identification to time out with no response.
verifyTlsFailsDuringHandshake(remoteSysB.actorSystem, pathEchoC)
}

// deploy a new key set
Expand Down Expand Up @@ -269,6 +273,31 @@ abstract class RotatingKeysSSLEngineProviderSpec(extraConfig: String)
senderOnSource.expectMsg("ping-1")
}

/**
* JDK 25+ verification: Strict X.509 Extended Key Usage (EKU) validation
* rejects the client-only certificate immediately during TLS handshake.
* The remote actor cannot be reached, so ActorIdentity returns with ref=None.
*
* @see [[https://openjdk.org/jeps/512 JEP 512: Enforce Extended Key Usage in TLS Certificates]]
*/
protected def verifyTlsRejectedByEkuValidation(fromSystem: ActorSystem, toPath: ActorPath): Unit = {
val probe = TestProbe()(fromSystem)
fromSystem.actorSelection(toPath).tell(Identify(toPath.name), probe.ref)
val identity = probe.expectMsgType[ActorIdentity]
identity.ref shouldBe None
}

/**
* Pre-JDK 25 verification: The TLS handshake with invalid certificates fails
* mid-exchange, causing the Identify message to be lost in transit. No ActorIdentity
* response arrives at all, which we verify by asserting no message is received.
*/
protected def verifyTlsFailsDuringHandshake(fromSystem: ActorSystem, toPath: ActorPath): Unit = {
val probe = TestProbe()(fromSystem)
fromSystem.actorSelection(toPath).tell(Identify(toPath.name), probe.ref)
probe.expectNoMessage()
}

def buildRemoteWithEchoActor(id: String): (RemoteSystem, ActorPath) = {
val remoteSys = new RemoteSystem(s"system$id", extraConfig, newRemoteSystem, address)
systemsToTerminate :+= remoteSys.actorSystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class FlowFlatMapConcatParallelismSpec extends StreamSpec("""
// 100K-element tests need extra headroom, especially on JDK 25+ where
// ForkJoinPool scheduling changes slow down highly-parallel workloads (#2573)
override implicit val patience: PatienceConfig =
PatienceConfig(timeout = Span(30, Seconds), interval = Span(1, Seconds))
PatienceConfig(timeout = Span(60, Seconds), interval = Span(1, Seconds))

val toSeq = Flow[Int].grouped(1000).toMat(Sink.head)(Keep.right)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class HubSpec extends StreamSpec {
// Long-stream tests (20K elements) need extra headroom on JDK 25+
// where ForkJoinPool scheduling changes cause slower throughput (#2573)
override implicit val patience: PatienceConfig =
PatienceConfig(timeout = Span(30, Seconds), interval = Span(1, Seconds))
PatienceConfig(timeout = Span(60, Seconds), interval = Span(1, Seconds))

"MergeHub" must {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.pekko.stream

import java.time.Instant
import java.util.concurrent.Executors
import java.util.concurrent.{ ExecutorService, Executors }

import scala.annotation.nowarn
import scala.concurrent.{ blocking, ExecutionContext, Future }
Expand Down Expand Up @@ -107,18 +107,22 @@ class MapAsyncPartitionedSpec

import MapAsyncPartitionedSpec.TestData._

// Property-based tests with blocking operations need extra headroom,
// especially on JDK 25+ with ForkJoinPool scheduling changes (#2573)
// These suites materialize many short-lived streams. On busy CI nodes,
// JDK 25 makes the 1000-sample property checks noticeably more expensive (#2573).
override implicit def patienceConfig: PatienceConfig = PatienceConfig(
timeout = 15.seconds,
timeout = 60.seconds,
interval = 100.millis)

private val heavyPropertyChecks = minSuccessful(100)

private implicit val system: ActorSystem[_] = ActorSystem(Behaviors.empty, "test-system")
private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
private val executor: ExecutorService = Executors.newCachedThreadPool()
private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(executor)

override protected def afterAll(): Unit = {
system.terminate()
system.whenTerminated.futureValue
executor.shutdown()
super.afterAll()
}

Expand Down Expand Up @@ -149,7 +153,7 @@ class MapAsyncPartitionedSpec
}

it should "process elements in parallel preserving order in partition" in {
forAll(minSuccessful(1000)) { (parallelism: Parallelism, elements: Seq[TestKeyValue]) =>
forAll(heavyPropertyChecks) { (parallelism: Parallelism, elements: Seq[TestKeyValue]) =>
val result =
Source(elements.toIndexedSeq)
.mapAsyncPartitionedUnordered(parallelism.value)(partitioner)(asyncOperation)
Expand All @@ -164,7 +168,7 @@ class MapAsyncPartitionedSpec
}

it should "process elements in sequence preserving order in partition" in {
forAll(minSuccessful(1000)) { (elements: Seq[TestKeyValue]) =>
forAll(heavyPropertyChecks) { (elements: Seq[TestKeyValue]) =>
val result =
Source
.fromIterator(() => elements.iterator)
Expand Down Expand Up @@ -301,7 +305,7 @@ class MapAsyncPartitionedSpec
}

it should "process elements in parallel preserving order in partition" in {
forAll(minSuccessful(1000)) { (parallelism: Parallelism, elements: Seq[TestKeyValue]) =>
forAll(heavyPropertyChecks) { (parallelism: Parallelism, elements: Seq[TestKeyValue]) =>
val result =
Source(elements.toIndexedSeq)
.mapAsyncPartitioned(parallelism.value)(partitioner)(asyncOperation)
Expand All @@ -316,7 +320,7 @@ class MapAsyncPartitionedSpec
}

it should "process elements in sequence preserving order in partition" in {
forAll(minSuccessful(1000)) { (elements: Seq[TestKeyValue]) =>
forAll(heavyPropertyChecks) { (elements: Seq[TestKeyValue]) =>
val result =
Source
.fromIterator(() => elements.iterator)
Expand Down
Loading