Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 53 additions & 13 deletions core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.spark.storage

import java.io.DataInputStream
import java.io.{DataInputStream, InputStream}
import java.nio.ByteBuffer

import scala.concurrent.Future
import scala.reflect.ClassTag

import io.netty.buffer.Unpooled
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

Expand All @@ -31,8 +32,8 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH}
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.network.util.{JavaUtils, LimitedInputStream}
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
Expand Down Expand Up @@ -114,6 +115,51 @@ private[storage] class FallbackStorageRpcEndpointRef(conf: SparkConf, hadoopConf
}
}

/**
* Lazily reads a segment of an Hadoop FileSystem file, i.e. when createInputStream is called.
* @param filesystem hadoop filesystem
* @param file path of the file
* @param offset offset of the segment
* @param length size of the segmetn
*/
private[storage] class FileSystemSegmentManagedBuffer(
filesystem: FileSystem,
file: Path,
offset: Long,
length: Long) extends ManagedBuffer with Logging {

override def size(): Long = length

override def nioByteBuffer(): ByteBuffer = {
Utils.tryWithResource(createInputStream()) { in =>
ByteBuffer.wrap(in.readAllBytes())
}
}

override def createInputStream(): InputStream = {
val startTimeNs = System.nanoTime()
try {
val in = filesystem.open(file)
in.seek(offset)
new LimitedInputStream(in, length)
} finally {
logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 1000)}ms")
}
}

override def retain(): ManagedBuffer = this

override def release(): ManagedBuffer = this

override def convertToNetty(): AnyRef = {
Unpooled.wrappedBuffer(nioByteBuffer());
}

override def convertToNettyForSsl(): AnyRef = {
Unpooled.wrappedBuffer(nioByteBuffer());
}
}

private[spark] object FallbackStorage extends Logging {
/** We use one block manager id as a place holder. */
val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", "remote", 7337)
Expand Down Expand Up @@ -168,7 +214,9 @@ private[spark] object FallbackStorage extends Logging {
}

/**
* Read a ManagedBuffer.
* Read a block as ManagedBuffer. This reads the index for offset and block size
* but does not read the actual block data. Those data are later read when calling
* createInputStream() on the returned ManagedBuffer.
*/
def read(conf: SparkConf, blockId: BlockId): ManagedBuffer = {
logInfo(log"Read ${MDC(BLOCK_ID, blockId)}")
Expand Down Expand Up @@ -202,15 +250,7 @@ private[spark] object FallbackStorage extends Logging {
val hash = JavaUtils.nonNegativeHash(name)
val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$hash/$name")
val size = nextOffset - offset
logDebug(s"To byte array $size")
val array = new Array[Byte](size.toInt)
val startTimeNs = System.nanoTime()
Utils.tryWithResource(fallbackFileSystem.open(dataFile)) { f =>
f.seek(offset)
f.readFully(array)
logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 1000)}ms")
}
new NioManagedBuffer(ByteBuffer.wrap(array))
new FileSystemSegmentManagedBuffer(fallbackFileSystem, dataFile, offset, size)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import java.nio.file.Files
import scala.concurrent.duration._
import scala.util.Random

import io.netty.buffer.ByteBuf
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, LocalFileSystem, Path, PositionedReadable, Seekable}
import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, LocalFileSystem, Path, PositionedReadable, Seekable}
import org.mockito.{ArgumentMatchers => mc}
import org.mockito.Mockito.{mock, never, verify, when}
import org.mockito.Mockito.{mock, never, spy, times, verify, when}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TestUtils}
Expand Down Expand Up @@ -110,7 +111,9 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
intercept[java.io.EOFException] {
FallbackStorage.read(conf, ShuffleBlockId(1, 1L, 0))
}
FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0))
val readResult = FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0))
assert(readResult.isInstanceOf[FileSystemSegmentManagedBuffer])
readResult.createInputStream().close()
}

test("SPARK-39200: fallback storage APIs - readFully") {
Expand Down Expand Up @@ -155,9 +158,49 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
assert(fallbackStorage.exists(1, ShuffleDataBlockId(1, 2L, NOOP_REDUCE_ID).name))

val readResult = FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0))
assert(readResult.isInstanceOf[FileSystemSegmentManagedBuffer])
assert(readResult.nioByteBuffer().array().sameElements(content))
}

test("SPARK-55469: FileSystemSegmentManagedBuffer reads block data lazily") {
withTempDir { dir =>
val fs = FileSystem.getLocal(new Configuration())
val file = new Path(dir.getAbsolutePath, "file")
val data = Array[Byte](1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
tryWithResource(fs.create(file)) { os => os.write(data) }

Seq((0, 4), (1, 2), (4, 4), (7, 2), (8, 0)).foreach { case (offset, length) =>
val clue = s"offset: $offset, length: $length"

// creating the managed buffer does not open the file
val mfs = spy(fs)
val buf = new FileSystemSegmentManagedBuffer(mfs, file, offset, length)
verify(mfs, never()).open(mc.any[Path]())
assert(buf.size() === length, clue)

// creating the input stream opens the file
{
val bytes = buf.createInputStream().readAllBytes()
verify(mfs, times(1)).open(mc.any[Path]())
assert(bytes.mkString(",") === data.slice(offset, offset + length).mkString(","), clue)
}

// getting a NIO ByteBuffer opens the file again
{
val bytes = buf.nioByteBuffer().array()
verify(mfs, times(2)).open(mc.any[Path]())
assert(bytes.mkString(",") === data.slice(offset, offset + length).mkString(","), clue)
}

// getting a Netty ByteBufs opens the file again and again
assert(buf.convertToNetty().asInstanceOf[ByteBuf].release() === length > 0, clue)
verify(mfs, times(3)).open(mc.any[Path]())
assert(buf.convertToNettyForSsl().asInstanceOf[ByteBuf].release() === length > 0, clue)
verify(mfs, times(4)).open(mc.any[Path]())
}
}
}

test("SPARK-34142: fallback storage API - cleanUp app") {
withTempDir { dir =>
Seq(true, false).foreach { cleanUp =>
Expand Down Expand Up @@ -372,6 +415,7 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
}
}
}

class ReadPartialInputStream(val in: FSDataInputStream) extends InputStream
with Seekable with PositionedReadable {
override def read: Int = in.read
Expand Down
Loading