Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import ammonite.compiler.iface.CodeWrapper
import ammonite.interp.{Interpreter, Watchable}
import ammonite.main.Defaults
import ammonite.repl.Repl
import ammonite.runtime.Storage
import ammonite.util.{Bind, Imports, Name, PredefInfo, Ref, Res, Util}
import ammonite.util.Util.newLine

Expand Down Expand Up @@ -102,9 +103,13 @@ Spark session available as 'spark'.
|""".stripMargin
// Please note that we make ammonite generate classes instead of objects.
// Classes tend to have superior serialization behavior when using UDFs.
// SPARK-56448: Use Storage.InMemory to avoid stale compile cache across restarts.
// The default Storage.Folder persists compiled predef classes under ~/.ammonite. On a
// subsequent REPL start, the cached CodePredef references a stale ArgsPredef, causing NPE.
val main = new ammonite.Main(
welcomeBanner = Option(splash.format(spark_version, spark.version)),
predefCode = predefCode,
storageBackend = new Storage.InMemory(),
replCodeWrapper = ExtendedCodeClassWrapper,
scriptCodeWrapper = ExtendedCodeClassWrapper,
inputStream = inputStream,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.application

import java.util.concurrent.TimeUnit

import scala.collection.mutable.ArrayBuffer
import scala.sys.process.BasicIO

import org.apache.spark.sql.connect.test.{ConnectFunSuite, RemoteSparkSession}
import org.apache.spark.tags.AmmoniteTest

@AmmoniteTest
class AmmoniteReplE2ESuite extends ConnectFunSuite with RemoteSparkSession {

private def runSparkShell(): (Int, String, String) = {
val sparkHome = sys.props.getOrElse(
"spark.test.home",
sys.env.getOrElse("SPARK_HOME", fail("spark.test.home or SPARK_HOME not set")))
val command = Seq(s"$sparkHome/bin/spark-shell", "--remote", s"sc://localhost:$serverPort")

val process = new ProcessBuilder(command: _*).start()
// Close stdin immediately so shell exits on EOF
process.getOutputStream.close()

val stdout = new ArrayBuffer[String]()
val stderr = new ArrayBuffer[String]()
val stdoutThread = new Thread() {
setDaemon(true)
override def run(): Unit = BasicIO.processFully(stdout += _)(process.getInputStream)
}
val stderrThread = new Thread() {
setDaemon(true)
override def run(): Unit = BasicIO.processFully(stderr += _)(process.getErrorStream)
}
stdoutThread.start()
stderrThread.start()

val exited = process.waitFor(60, TimeUnit.SECONDS)
if (!exited) {
process.destroyForcibly()
fail("spark-shell did not exit within 60 seconds")
}
stdoutThread.join(10000)
stderrThread.join(10000)
(process.exitValue(), stdout.mkString("\n"), stderr.mkString("\n"))
}

test("SPARK-56448: restarting spark-shell --remote does not throw NPE") {
Copy link
Copy Markdown
Member

@dongjoon-hyun dongjoon-hyun May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case seems to introduce a flakiness. I didn't dig much, but have you observed the following failures in your CIs, @yadavay-amzn and @attilapiros ?

    [info] AmmoniteReplE2ESuite:
    [info] - SPARK-56448: restarting spark-shell --remote does not throw NPE *** FAILED *** (1 second, 975 milliseconds)
    [info]   1 did not equal 0 First spark-shell failed (exit=1): WARNING: Using incubator modules: jdk.incubator.vector
    [info]   Exception in thread "main" java.lang.ExceptionInInitializerError
    [info]   	at org.apache.arrow.memory.netty.DefaultAllocationManagerFactory.<clinit>(DefaultAllocationManagerFactory.java:26)
...
    [info]   Caused by: java.lang.UnsupportedOperationException
    [info]   	at org.sparkproject.connect.client.io.netty.buffer.EmptyByteBuf.memoryAddress(EmptyByteBuf.java:961)
    [info]   	at org.sparkproject.connect.client.io.netty.buffer.UnsafeDirectLittleEndian.<init>(UnsafeDirectLittleEndian.java:45)
    [info]   	at org.sparkproject.connect.client.io.netty.buffer.PooledByteBufAllocatorL.<init>(PooledByteBufAllocatorL.java:47)
    [info]   	... 32 more (AmmoniteReplE2ESuite.scala:66)

cc @peter-toth

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun Thanks for taking a look. We haven't seen this specific Arrow/Netty failure in our CI runs.
The CI failures we saw were always the OracleIntegrationSuite Docker test.

The test launches spark-shell --remote as a subprocess, so it's sensitive to the JDK environment the CI runner uses. Would it help to add a retry or mark this test as flaky?

I'll also investigate the Arrow/Netty root cause by running the test locally to see if the CI flaky behavior is reproducible locally.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ran the AmmoniteReplE2ESuite test 10 times locally on current master (JDK 17) and its passing consistently. So the flakiness seems specific to the CI run.

The error (ExceptionInInitializerError in Arrow's DefaultAllocationManagerFactory / Netty's PooledByteBufAllocatorL) looks like a JDK/Arrow memory allocator incompatibility which may be JDK version sensitive. The test itself just launches spark-shell --remote as a subprocess and it doesn't do anything Arrow/Netty-specific.

@dongjoon-hyun / @peter-toth Could you share which JDK vendor/version and CI environment produced the failure? Or a link to the CI job if you have it? I can deep dive to figure out the issue and try repro with same setup.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for confirming, @yadavay-amzn . For my cases, the failures are consistently observed with Java 25 + UBI 10 combination.

[root@1d494a9a9969 /]# java --version
openjdk 25.0.3 2026-04-21 LTS
...

[root@1d494a9a9969 /]# cat /etc/os-release
NAME="Red Hat Enterprise Linux"
VERSION="10.1 (Coughlan)"

// First invocation
val (exit1, _, stderr1) = runSparkShell()
assert(exit1 == 0, s"First spark-shell failed (exit=$exit1): $stderr1")

// Second invocation -- without the fix, this would NPE from stale Ammonite cache
val (exit2, _, stderr2) = runSparkShell()
assert(exit2 == 0, s"Second spark-shell failed (exit=$exit2): $stderr2")
}
}