Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ abstract class EventLogFileWriter(
protected var writer: Option[PrintWriter] = None

protected def requireLogBaseDirAsDirectory(): Unit = {
if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
val basePath = new Path(logBaseDir)
if (!fileSystem.exists(basePath)) {
FileSystem.mkdirs(fileSystem, basePath, EventLogFileWriter.LOG_FOLDER_PERMISSIONS)
}
if (!fileSystem.getFileStatus(basePath).isDirectory) {
throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,12 +349,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
} catch {
case f: FileNotFoundException =>
var msg = s"Log directory specified does not exist: $dir"
if (dir == DEFAULT_LOG_DIR) {
msg += " Did you configure the correct one through spark.history.fs.logDirectory?"
if (FileSystem.mkdirs(dirFs, path, EventLogFileWriter.LOG_FOLDER_PERMISSIONS)) {
logInfo(log"Created missing history log directory ${MDC(HISTORY_DIR, dir)}.")
true
} else {
var msg = s"Log directory specified does not exist: $dir"
if (dir == DEFAULT_LOG_DIR) {
msg += " Did you configure the correct one through spark.history.fs.logDirectory?"
}
logWarning(msg)
false
}
logWarning(msg)
false
}
}
require(validDirs.nonEmpty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,25 @@ abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkCon
}
}

test("create missing spark.eventLog.dir automatically") {
val appId = getUniqueApplicationId
val attemptId = None
val missingDir = new File(testDir, "missing-event-log-dir")
val missingDirPath = new Path(missingDir.getAbsolutePath)
assert(!missingDir.exists())

val conf = getLoggingConf(missingDirPath, None)
val writer = createWriter(appId, attemptId, missingDirPath.toUri, conf,
SparkHadoopUtil.get.newConfiguration(conf))

writer.start()
writer.writeEvent("dummy", flushLogger = true)
writer.stop()

assert(missingDir.isDirectory)
}


test("Use the default value of spark.eventLog.compression.codec") {
val conf = new SparkConf
conf.set(EVENT_LOG_COMPRESS, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2266,6 +2266,29 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P
}
}

test("create missing spark.history.fs.logDirectory automatically") {
val missingDir = Utils.createTempDir(namePrefix = "missingHistoryDir")
val missingDirPath = missingDir.getAbsolutePath
Utils.deleteRecursively(missingDir)
assert(!new File(missingDirPath).exists())

val conf = createTestConf().set(HISTORY_LOG_DIR, missingDirPath)
val provider = new FsHistoryProvider(conf)
try {
updateAndCheck(provider) { list =>
list.size should be(0)
}
new File(missingDirPath).isDirectory should be(true)
} finally {
provider.stop()
val recreatedDir = new File(missingDirPath)
if (recreatedDir.exists()) {
Utils.deleteRecursively(recreatedDir)
}
}
}


test("SPARK-55864: directory temporarily inaccessible then recovers") {
val dir2 = Utils.createTempDir(namePrefix = "logDir2")
try {
Expand Down