aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala17
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala78
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala53
-rw-r--r--core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala180
-rw-r--r--core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala163
-rw-r--r--core/src/main/scala/org/apache/spark/util/logging/RollingPolicy.scala139
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala225
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala32
-rw-r--r--docs/configuration.md39
12 files changed, 895 insertions, 40 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index d27e0e1f15..d09136de49 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -23,9 +23,10 @@ import akka.actor.ActorRef
import com.google.common.base.Charsets
import com.google.common.io.Files
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
+import org.apache.spark.util.logging.FileAppender
/**
* Manages the execution of one executor process.
@@ -42,12 +43,15 @@ private[spark] class ExecutorRunner(
val sparkHome: File,
val workDir: File,
val workerUrl: String,
+ val conf: SparkConf,
var state: ExecutorState.Value)
extends Logging {
val fullId = appId + "/" + execId
var workerThread: Thread = null
var process: Process = null
+ var stdoutAppender: FileAppender = null
+ var stderrAppender: FileAppender = null
// NOTE: This is now redundant with the automated shut-down enforced by the Executor. It might
// make sense to remove this in the future.
@@ -76,6 +80,13 @@ private[spark] class ExecutorRunner(
if (process != null) {
logInfo("Killing process!")
process.destroy()
+ process.waitFor()
+ if (stdoutAppender != null) {
+ stdoutAppender.stop()
+ }
+ if (stderrAppender != null) {
+ stderrAppender.stop()
+ }
val exitCode = process.waitFor()
worker ! ExecutorStateChanged(appId, execId, state, message, Some(exitCode))
}
@@ -137,11 +148,11 @@ private[spark] class ExecutorRunner(
// Redirect its stdout and stderr to files
val stdout = new File(executorDir, "stdout")
- CommandUtils.redirectStream(process.getInputStream, stdout)
+ stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
val stderr = new File(executorDir, "stderr")
Files.write(header, stderr, Charsets.UTF_8)
- CommandUtils.redirectStream(process.getErrorStream, stderr)
+ stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
// Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
// long-lived processes only. However, in the future, we might restart the executor a few
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 100de26170..a0ecaf709f 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -235,7 +235,7 @@ private[spark] class Worker(
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
self, workerId, host,
appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
- workDir, akkaUrl, ExecutorState.RUNNING)
+ workDir, akkaUrl, conf, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
index 8381f59672..6a5ffb1b71 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
@@ -24,8 +24,10 @@ import scala.xml.Node
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils
+import org.apache.spark.Logging
+import org.apache.spark.util.logging.{FileAppender, RollingFileAppender}
-private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
+private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with Logging {
private val worker = parent.worker
private val workDir = parent.workDir
@@ -39,21 +41,18 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
val offset = Option(request.getParameter("offset")).map(_.toLong)
val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
- val path = (appId, executorId, driverId) match {
+ val logDir = (appId, executorId, driverId) match {
case (Some(a), Some(e), None) =>
- s"${workDir.getPath}/$appId/$executorId/$logType"
+ s"${workDir.getPath}/$appId/$executorId/"
case (None, None, Some(d)) =>
- s"${workDir.getPath}/$driverId/$logType"
+ s"${workDir.getPath}/$driverId/"
case _ =>
throw new Exception("Request must specify either application or driver identifiers")
}
- val (startByte, endByte) = getByteRange(path, offset, byteLength)
- val file = new File(path)
- val logLength = file.length
-
- val pre = s"==== Bytes $startByte-$endByte of $logLength of $path ====\n"
- pre + Utils.offsetBytes(path, startByte, endByte)
+ val (logText, startByte, endByte, logLength) = getLog(logDir, logType, offset, byteLength)
+ val pre = s"==== Bytes $startByte-$endByte of $logLength of $logDir$logType ====\n"
+ pre + logText
}
def render(request: HttpServletRequest): Seq[Node] = {
@@ -65,19 +64,16 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
val offset = Option(request.getParameter("offset")).map(_.toLong)
val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
- val (path, params) = (appId, executorId, driverId) match {
+ val (logDir, params) = (appId, executorId, driverId) match {
case (Some(a), Some(e), None) =>
- (s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e")
+ (s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e")
case (None, None, Some(d)) =>
- (s"${workDir.getPath}/$d/$logType", s"driverId=$d")
+ (s"${workDir.getPath}/$d/", s"driverId=$d")
case _ =>
throw new Exception("Request must specify either application or driver identifiers")
}
- val (startByte, endByte) = getByteRange(path, offset, byteLength)
- val file = new File(path)
- val logLength = file.length
- val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node>
+ val (logText, startByte, endByte, logLength) = getLog(logDir, logType, offset, byteLength)
val linkToMaster = <p><a href={worker.activeMasterWebUiUrl}>Back to Master</a></p>
val range = <span>Bytes {startByte.toString} - {endByte.toString} of {logLength}</span>
@@ -127,23 +123,37 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
UIUtils.basicSparkPage(content, logType + " log page for " + appId)
}
- /** Determine the byte range for a log or log page. */
- private def getByteRange(path: String, offset: Option[Long], byteLength: Int): (Long, Long) = {
- val defaultBytes = 100 * 1024
- val maxBytes = 1024 * 1024
- val file = new File(path)
- val logLength = file.length()
- val getOffset = offset.getOrElse(logLength - defaultBytes)
- val startByte =
- if (getOffset < 0) {
- 0L
- } else if (getOffset > logLength) {
- logLength
- } else {
- getOffset
+ /** Get the part of the log files given the offset and desired length of bytes */
+ private def getLog(
+ logDirectory: String,
+ logType: String,
+ offsetOption: Option[Long],
+ byteLength: Int
+ ): (String, Long, Long, Long) = {
+ try {
+ val files = RollingFileAppender.getSortedRolledOverFiles(logDirectory, logType)
+ logDebug(s"Sorted log files of type $logType in $logDirectory:\n${files.mkString("\n")}")
+
+ val totalLength = files.map { _.length }.sum
+ val offset = offsetOption.getOrElse(totalLength - byteLength)
+ val startIndex = {
+ if (offset < 0) {
+ 0L
+ } else if (offset > totalLength) {
+ totalLength
+ } else {
+ offset
+ }
}
- val logPageLength = math.min(byteLength, maxBytes)
- val endByte = math.min(startByte + logPageLength, logLength)
- (startByte, endByte)
+ val endIndex = math.min(startIndex + totalLength, totalLength)
+ logDebug(s"Getting log from $startIndex to $endIndex")
+ val logText = Utils.offsetBytes(files, startIndex, endIndex)
+ logDebug(s"Got log of length ${logText.length} bytes")
+ (logText, startIndex, endIndex, totalLength)
+ } catch {
+ case e: Exception =>
+ logError(s"Error getting $logType logs from directory $logDirectory", e)
+ ("Error getting logs due to exception: " + e.getMessage, 0, 0, 0)
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 3b1b6df089..4ce28bb0cf 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -863,6 +863,59 @@ private[spark] object Utils extends Logging {
}
/**
+ * Return a string containing data across a set of files. The `startIndex`
+ * and `endIndex` is based on the cumulative size of all the files take in
+ * the given order. See figure below for more details.
+ */
+ def offsetBytes(files: Seq[File], start: Long, end: Long): String = {
+ val fileLengths = files.map { _.length }
+ val startIndex = math.max(start, 0)
+ val endIndex = math.min(end, fileLengths.sum)
+ val fileToLength = files.zip(fileLengths).toMap
+ logDebug("Log files: \n" + fileToLength.mkString("\n"))
+
+ val stringBuffer = new StringBuffer((endIndex - startIndex).toInt)
+ var sum = 0L
+ for (file <- files) {
+ val startIndexOfFile = sum
+ val endIndexOfFile = sum + fileToLength(file)
+ logDebug(s"Processing file $file, " +
+ s"with start index = $startIndexOfFile, end index = $endIndex")
+
+ /*
+ ____________
+ range 1: | |
+ | case A |
+
+ files: |==== file 1 ====|====== file 2 ======|===== file 3 =====|
+
+ | case B . case C . case D |
+ range 2: |___________.____________________.______________|
+ */
+
+ if (startIndex <= startIndexOfFile && endIndex >= endIndexOfFile) {
+ // Case C: read the whole file
+ stringBuffer.append(offsetBytes(file.getAbsolutePath, 0, fileToLength(file)))
+ } else if (startIndex > startIndexOfFile && startIndex < endIndexOfFile) {
+ // Case A and B: read from [start of required range] to [end of file / end of range]
+ val effectiveStartIndex = startIndex - startIndexOfFile
+ val effectiveEndIndex = math.min(endIndex - startIndexOfFile, fileToLength(file))
+ stringBuffer.append(Utils.offsetBytes(
+ file.getAbsolutePath, effectiveStartIndex, effectiveEndIndex))
+ } else if (endIndex > startIndexOfFile && endIndex < endIndexOfFile) {
+ // Case D: read from [start of file] to [end of require range]
+ val effectiveStartIndex = math.max(startIndex - startIndexOfFile, 0)
+ val effectiveEndIndex = endIndex - startIndexOfFile
+ stringBuffer.append(Utils.offsetBytes(
+ file.getAbsolutePath, effectiveStartIndex, effectiveEndIndex))
+ }
+ sum += fileToLength(file)
+ logDebug(s"After processing file $file, string built is ${stringBuffer.toString}}")
+ }
+ stringBuffer.toString
+ }
+
+ /**
* Clone an object using a Spark serializer.
*/
def clone[T: ClassTag](value: T, serializer: SerializerInstance): T = {
diff --git a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
new file mode 100644
index 0000000000..8e9c3036d0
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io.{File, FileOutputStream, InputStream}
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.util.{IntParam, Utils}
+
+/**
+ * Continuously appends the data from an input stream into the given file.
+ */
+private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSize: Int = 8192)
+ extends Logging {
+ @volatile private var outputStream: FileOutputStream = null
+ @volatile private var markedForStop = false // has the appender been asked to stopped
+ @volatile private var stopped = false // has the appender stopped
+
+ // Thread that reads the input stream and writes to file
+ private val writingThread = new Thread("File appending thread for " + file) {
+ setDaemon(true)
+ override def run() {
+ Utils.logUncaughtExceptions {
+ appendStreamToFile()
+ }
+ }
+ }
+ writingThread.start()
+
+ /**
+ * Wait for the appender to stop appending, either because input stream is closed
+ * or because of any error in appending
+ */
+ def awaitTermination() {
+ synchronized {
+ if (!stopped) {
+ wait()
+ }
+ }
+ }
+
+ /** Stop the appender */
+ def stop() {
+ markedForStop = true
+ }
+
+ /** Continuously read chunks from the input stream and append to the file */
+ protected def appendStreamToFile() {
+ try {
+ logDebug("Started appending thread")
+ openFile()
+ val buf = new Array[Byte](bufferSize)
+ var n = 0
+ while (!markedForStop && n != -1) {
+ n = inputStream.read(buf)
+ if (n != -1) {
+ appendToFile(buf, n)
+ }
+ }
+ } catch {
+ case e: Exception =>
+ logError(s"Error writing stream to file $file", e)
+ } finally {
+ closeFile()
+ synchronized {
+ stopped = true
+ notifyAll()
+ }
+ }
+ }
+
+ /** Append bytes to the file output stream */
+ protected def appendToFile(bytes: Array[Byte], len: Int) {
+ if (outputStream == null) {
+ openFile()
+ }
+ outputStream.write(bytes, 0, len)
+ }
+
+ /** Open the file output stream */
+ protected def openFile() {
+ outputStream = new FileOutputStream(file, false)
+ logDebug(s"Opened file $file")
+ }
+
+ /** Close the file output stream */
+ protected def closeFile() {
+ outputStream.flush()
+ outputStream.close()
+ logDebug(s"Closed file $file")
+ }
+}
+
+/**
+ * Companion object to [[org.apache.spark.util.logging.FileAppender]] which has helper
+ * functions to choose the correct type of FileAppender based on SparkConf configuration.
+ */
+private[spark] object FileAppender extends Logging {
+
+ /** Create the right appender based on Spark configuration */
+ def apply(inputStream: InputStream, file: File, conf: SparkConf): FileAppender = {
+
+ import RollingFileAppender._
+
+ val rollingStrategy = conf.get(STRATEGY_PROPERTY, STRATEGY_DEFAULT)
+ val rollingSizeBytes = conf.get(SIZE_PROPERTY, STRATEGY_DEFAULT)
+ val rollingInterval = conf.get(INTERVAL_PROPERTY, INTERVAL_DEFAULT)
+
+ def createTimeBasedAppender() = {
+ val validatedParams: Option[(Long, String)] = rollingInterval match {
+ case "daily" =>
+ logInfo(s"Rolling executor logs enabled for $file with daily rolling")
+ Some(24 * 60 * 60 * 1000L, "--YYYY-MM-dd")
+ case "hourly" =>
+ logInfo(s"Rolling executor logs enabled for $file with hourly rolling")
+ Some(60 * 60 * 1000L, "--YYYY-MM-dd--HH")
+ case "minutely" =>
+ logInfo(s"Rolling executor logs enabled for $file with rolling every minute")
+ Some(60 * 1000L, "--YYYY-MM-dd--HH-mm")
+ case IntParam(seconds) =>
+ logInfo(s"Rolling executor logs enabled for $file with rolling $seconds seconds")
+ Some(seconds * 1000L, "--YYYY-MM-dd--HH-mm-ss")
+ case _ =>
+ logWarning(s"Illegal interval for rolling executor logs [$rollingInterval], " +
+ s"rolling logs not enabled")
+ None
+ }
+ validatedParams.map {
+ case (interval, pattern) =>
+ new RollingFileAppender(
+ inputStream, file, new TimeBasedRollingPolicy(interval, pattern), conf)
+ }.getOrElse {
+ new FileAppender(inputStream, file)
+ }
+ }
+
+ def createSizeBasedAppender() = {
+ rollingSizeBytes match {
+ case IntParam(bytes) =>
+ logInfo(s"Rolling executor logs enabled for $file with rolling every $bytes bytes")
+ new RollingFileAppender(inputStream, file, new SizeBasedRollingPolicy(bytes), conf)
+ case _ =>
+ logWarning(
+ s"Illegal size [$rollingSizeBytes] for rolling executor logs, rolling logs not enabled")
+ new FileAppender(inputStream, file)
+ }
+ }
+
+ rollingStrategy match {
+ case "" =>
+ new FileAppender(inputStream, file)
+ case "time" =>
+ createTimeBasedAppender()
+ case "size" =>
+ createSizeBasedAppender()
+ case _ =>
+ logWarning(
+ s"Illegal strategy [$rollingStrategy] for rolling executor logs, " +
+ s"rolling logs not enabled")
+ new FileAppender(inputStream, file)
+ }
+ }
+}
+
+
diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
new file mode 100644
index 0000000000..1bbbd20cf0
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io.{File, FileFilter, InputStream}
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.SparkConf
+import RollingFileAppender._
+
+/**
+ * Continuously appends data from input stream into the given file, and rolls
+ * over the file after the given interval. The rolled over files are named
+ * based on the given pattern.
+ *
+ * @param inputStream Input stream to read data from
+ * @param activeFile File to write data to
+ * @param rollingPolicy Policy based on which files will be rolled over.
+ * @param conf SparkConf that is used to pass on extra configurations
+ * @param bufferSize Optional buffer size. Used mainly for testing.
+ */
+private[spark] class RollingFileAppender(
+ inputStream: InputStream,
+ activeFile: File,
+ val rollingPolicy: RollingPolicy,
+ conf: SparkConf,
+ bufferSize: Int = DEFAULT_BUFFER_SIZE
+ ) extends FileAppender(inputStream, activeFile, bufferSize) {
+
+ private val maxRetainedFiles = conf.getInt(RETAINED_FILES_PROPERTY, -1)
+
+ /** Stop the appender */
+ override def stop() {
+ super.stop()
+ }
+
+ /** Append bytes to file after rolling over is necessary */
+ override protected def appendToFile(bytes: Array[Byte], len: Int) {
+ if (rollingPolicy.shouldRollover(len)) {
+ rollover()
+ rollingPolicy.rolledOver()
+ }
+ super.appendToFile(bytes, len)
+ rollingPolicy.bytesWritten(len)
+ }
+
+ /** Rollover the file, by closing the output stream and moving it over */
+ private def rollover() {
+ try {
+ closeFile()
+ moveFile()
+ openFile()
+ if (maxRetainedFiles > 0) {
+ deleteOldFiles()
+ }
+ } catch {
+ case e: Exception =>
+ logError(s"Error rolling over $activeFile", e)
+ }
+ }
+
+ /** Move the active log file to a new rollover file */
+ private def moveFile() {
+ val rolloverSuffix = rollingPolicy.generateRolledOverFileSuffix()
+ val rolloverFile = new File(
+ activeFile.getParentFile, activeFile.getName + rolloverSuffix).getAbsoluteFile
+ try {
+ logDebug(s"Attempting to rollover file $activeFile to file $rolloverFile")
+ if (activeFile.exists) {
+ if (!rolloverFile.exists) {
+ FileUtils.moveFile(activeFile, rolloverFile)
+ logInfo(s"Rolled over $activeFile to $rolloverFile")
+ } else {
+ // In case the rollover file name clashes, make a unique file name.
+ // The resultant file names are long and ugly, so this is used only
+ // if there is a name collision. This can be avoided by the using
+ // the right pattern such that name collisions do not occur.
+ var i = 0
+ var altRolloverFile: File = null
+ do {
+ altRolloverFile = new File(activeFile.getParent,
+ s"${activeFile.getName}$rolloverSuffix--$i").getAbsoluteFile
+ i += 1
+ } while (i < 10000 && altRolloverFile.exists)
+
+ logWarning(s"Rollover file $rolloverFile already exists, " +
+ s"rolled over $activeFile to file $altRolloverFile")
+ FileUtils.moveFile(activeFile, altRolloverFile)
+ }
+ } else {
+ logWarning(s"File $activeFile does not exist")
+ }
+ }
+ }
+
+ /** Retain only last few files */
+ private[util] def deleteOldFiles() {
+ try {
+ val rolledoverFiles = activeFile.getParentFile.listFiles(new FileFilter {
+ def accept(f: File): Boolean = {
+ f.getName.startsWith(activeFile.getName) && f != activeFile
+ }
+ }).sorted
+ val filesToBeDeleted = rolledoverFiles.take(
+ math.max(0, rolledoverFiles.size - maxRetainedFiles))
+ filesToBeDeleted.foreach { file =>
+ logInfo(s"Deleting file executor log file ${file.getAbsolutePath}")
+ file.delete()
+ }
+ } catch {
+ case e: Exception =>
+ logError("Error cleaning logs in directory " + activeFile.getParentFile.getAbsolutePath, e)
+ }
+ }
+}
+
+/**
+ * Companion object to [[org.apache.spark.util.logging.RollingFileAppender]]. Defines
+ * names of configurations that configure rolling file appenders.
+ */
+private[spark] object RollingFileAppender {
+ val STRATEGY_PROPERTY = "spark.executor.logs.rolling.strategy"
+ val STRATEGY_DEFAULT = ""
+ val INTERVAL_PROPERTY = "spark.executor.logs.rolling.time.interval"
+ val INTERVAL_DEFAULT = "daily"
+ val SIZE_PROPERTY = "spark.executor.logs.rolling.size.maxBytes"
+ val SIZE_DEFAULT = (1024 * 1024).toString
+ val RETAINED_FILES_PROPERTY = "spark.executor.logs.rolling.maxRetainedFiles"
+ val DEFAULT_BUFFER_SIZE = 8192
+
+ /**
+ * Get the sorted list of rolled over files. This assumes that the all the rolled
+ * over file names are prefixed with the `activeFileName`, and the active file
+ * name has the latest logs. So it sorts all the rolled over logs (that are
+ * prefixed with `activeFileName`) and appends the active file
+ */
+ def getSortedRolledOverFiles(directory: String, activeFileName: String): Seq[File] = {
+ val rolledOverFiles = new File(directory).getAbsoluteFile.listFiles.filter { file =>
+ val fileName = file.getName
+ fileName.startsWith(activeFileName) && fileName != activeFileName
+ }.sorted
+ val activeFile = {
+ val file = new File(directory, activeFileName).getAbsoluteFile
+ if (file.exists) Some(file) else None
+ }
+ rolledOverFiles ++ activeFile
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingPolicy.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingPolicy.scala
new file mode 100644
index 0000000000..84e5c3c917
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/logging/RollingPolicy.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.text.SimpleDateFormat
+import java.util.Calendar
+
+import org.apache.spark.Logging
+
+/**
+ * Defines the policy based on which [[org.apache.spark.util.logging.RollingFileAppender]] will
+ * generate rolling files.
+ */
+private[spark] trait RollingPolicy {
+
+ /** Whether rollover should be initiated at this moment */
+ def shouldRollover(bytesToBeWritten: Long): Boolean
+
+ /** Notify that rollover has occurred */
+ def rolledOver()
+
+ /** Notify that bytes have been written */
+ def bytesWritten(bytes: Long)
+
+ /** Get the desired name of the rollover file */
+ def generateRolledOverFileSuffix(): String
+}
+
+/**
+ * Defines a [[org.apache.spark.util.logging.RollingPolicy]] by which files will be rolled
+ * over at a fixed interval.
+ */
+private[spark] class TimeBasedRollingPolicy(
+ var rolloverIntervalMillis: Long,
+ rollingFileSuffixPattern: String,
+ checkIntervalConstraint: Boolean = true // set to false while testing
+ ) extends RollingPolicy with Logging {
+
+ import TimeBasedRollingPolicy._
+ if (checkIntervalConstraint && rolloverIntervalMillis < MINIMUM_INTERVAL_SECONDS * 1000L) {
+ logWarning(s"Rolling interval [${rolloverIntervalMillis/1000L} seconds] is too small. " +
+ s"Setting the interval to the acceptable minimum of $MINIMUM_INTERVAL_SECONDS seconds.")
+ rolloverIntervalMillis = MINIMUM_INTERVAL_SECONDS * 1000L
+ }
+
+ @volatile private var nextRolloverTime = calculateNextRolloverTime()
+ private val formatter = new SimpleDateFormat(rollingFileSuffixPattern)
+
+ /** Should rollover if current time has exceeded next rollover time */
+ def shouldRollover(bytesToBeWritten: Long): Boolean = {
+ System.currentTimeMillis > nextRolloverTime
+ }
+
+ /** Rollover has occurred, so find the next time to rollover */
+ def rolledOver() {
+ nextRolloverTime = calculateNextRolloverTime()
+ logDebug(s"Current time: ${System.currentTimeMillis}, next rollover time: " + nextRolloverTime)
+ }
+
+ def bytesWritten(bytes: Long) { } // nothing to do
+
+ private def calculateNextRolloverTime(): Long = {
+ val now = System.currentTimeMillis()
+ val targetTime = (
+ math.ceil(now.toDouble / rolloverIntervalMillis) * rolloverIntervalMillis
+ ).toLong
+ logDebug(s"Next rollover time is $targetTime")
+ targetTime
+ }
+
+ def generateRolledOverFileSuffix(): String = {
+ formatter.format(Calendar.getInstance.getTime)
+ }
+}
+
+private[spark] object TimeBasedRollingPolicy {
+ val MINIMUM_INTERVAL_SECONDS = 60L // 1 minute
+}
+
+/**
+ * Defines a [[org.apache.spark.util.logging.RollingPolicy]] by which files will be rolled
+ * over after reaching a particular size.
+ */
+private[spark] class SizeBasedRollingPolicy(
+ var rolloverSizeBytes: Long,
+ checkSizeConstraint: Boolean = true // set to false while testing
+ ) extends RollingPolicy with Logging {
+
+ import SizeBasedRollingPolicy._
+ if (checkSizeConstraint && rolloverSizeBytes < MINIMUM_SIZE_BYTES) {
+ logWarning(s"Rolling size [$rolloverSizeBytes bytes] is too small. " +
+ s"Setting the size to the acceptable minimum of $MINIMUM_SIZE_BYTES bytes.")
+ rolloverSizeBytes = MINIMUM_SIZE_BYTES
+ }
+
+ @volatile private var bytesWrittenSinceRollover = 0L
+ val formatter = new SimpleDateFormat("--YYYY-MM-dd--HH-mm-ss--SSSS")
+
+ /** Should rollover if the next set of bytes is going to exceed the size limit */
+ def shouldRollover(bytesToBeWritten: Long): Boolean = {
+ logInfo(s"$bytesToBeWritten + $bytesWrittenSinceRollover > $rolloverSizeBytes")
+ bytesToBeWritten + bytesWrittenSinceRollover > rolloverSizeBytes
+ }
+
+ /** Rollover has occurred, so reset the counter */
+ def rolledOver() {
+ bytesWrittenSinceRollover = 0
+ }
+
+ /** Increment the bytes that have been written in the current file */
+ def bytesWritten(bytes: Long) {
+ bytesWrittenSinceRollover += bytes
+ }
+
+ /** Get the desired name of the rollover file */
+ def generateRolledOverFileSuffix(): String = {
+ formatter.format(Calendar.getInstance.getTime)
+ }
+}
+
+private[spark] object SizeBasedRollingPolicy {
+ val MINIMUM_SIZE_BYTES = RollingFileAppender.DEFAULT_BUFFER_SIZE * 10
+}
+
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index bfae32dae0..01ab2d5493 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -28,6 +28,7 @@ import org.scalatest.FunSuite
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, RecoveryState, WorkerInfo}
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
+import org.apache.spark.SparkConf
class JsonProtocolSuite extends FunSuite {
@@ -116,7 +117,8 @@ class JsonProtocolSuite extends FunSuite {
}
def createExecutorRunner(): ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
- new File("sparkHome"), new File("workDir"), "akka://worker", ExecutorState.RUNNING)
+ new File("sparkHome"), new File("workDir"), "akka://worker",
+ new SparkConf, ExecutorState.RUNNING)
}
def createDriverRunner(): DriverRunner = {
new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), createDriverDesc(),
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 8ae387fa0b..e5f748d555 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -22,6 +22,7 @@ import java.io.File
import org.scalatest.FunSuite
import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
+import org.apache.spark.SparkConf
class ExecutorRunnerTest extends FunSuite {
test("command includes appId") {
@@ -32,7 +33,7 @@ class ExecutorRunnerTest extends FunSuite {
sparkHome, "appUiUrl")
val appId = "12345-worker321-9876"
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")),
- f("ooga"), "blah", ExecutorState.RUNNING)
+ f("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
assert(er.getCommandSeq.last === appId)
}
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
new file mode 100644
index 0000000000..53d7f5c607
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io._
+
+import scala.collection.mutable.HashSet
+import scala.reflect._
+
+import org.apache.commons.io.{FileUtils, IOUtils}
+import org.apache.spark.{Logging, SparkConf}
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.apache.spark.util.logging.{RollingFileAppender, SizeBasedRollingPolicy, TimeBasedRollingPolicy, FileAppender}
+
+class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
+
+ val testFile = new File("FileAppenderSuite-test-" + System.currentTimeMillis).getAbsoluteFile
+
+ before {
+ cleanup()
+ }
+
+ after {
+ cleanup()
+ }
+
+ test("basic file appender") {
+ val testString = (1 to 1000).mkString(", ")
+ val inputStream = IOUtils.toInputStream(testString)
+ val appender = new FileAppender(inputStream, testFile)
+ inputStream.close()
+ appender.awaitTermination()
+ assert(FileUtils.readFileToString(testFile) === testString)
+ }
+
+ test("rolling file appender - time-based rolling") {
+ // setup input stream and appender
+ val testOutputStream = new PipedOutputStream()
+ val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
+ val rolloverIntervalMillis = 100
+ val durationMillis = 1000
+ val numRollovers = durationMillis / rolloverIntervalMillis
+ val textToAppend = (1 to numRollovers).map( _.toString * 10 )
+
+ val appender = new RollingFileAppender(testInputStream, testFile,
+ new TimeBasedRollingPolicy(rolloverIntervalMillis, s"--HH-mm-ss-SSSS", false),
+ new SparkConf(), 10)
+
+ testRolling(appender, testOutputStream, textToAppend, rolloverIntervalMillis)
+ }
+
+ test("rolling file appender - size-based rolling") {
+ // setup input stream and appender
+ val testOutputStream = new PipedOutputStream()
+ val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
+ val rolloverSize = 1000
+ val textToAppend = (1 to 3).map( _.toString * 1000 )
+
+ val appender = new RollingFileAppender(testInputStream, testFile,
+ new SizeBasedRollingPolicy(rolloverSize, false), new SparkConf(), 99)
+
+ val files = testRolling(appender, testOutputStream, textToAppend, 0)
+ files.foreach { file =>
+ logInfo(file.toString + ": " + file.length + " bytes")
+ assert(file.length <= rolloverSize)
+ }
+ }
+
+ test("rolling file appender - cleaning") {
+ // setup input stream and appender
+ val testOutputStream = new PipedOutputStream()
+ val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
+ val conf = new SparkConf().set(RollingFileAppender.RETAINED_FILES_PROPERTY, "10")
+ val appender = new RollingFileAppender(testInputStream, testFile,
+ new SizeBasedRollingPolicy(1000, false), conf, 10)
+
+ // send data to appender through the input stream, and wait for the data to be written
+ val allGeneratedFiles = new HashSet[String]()
+ val items = (1 to 10).map { _.toString * 10000 }
+ for (i <- 0 until items.size) {
+ testOutputStream.write(items(i).getBytes("UTF8"))
+ testOutputStream.flush()
+ allGeneratedFiles ++= RollingFileAppender.getSortedRolledOverFiles(
+ testFile.getParentFile.toString, testFile.getName).map(_.toString)
+
+ Thread.sleep(10)
+ }
+ testOutputStream.close()
+ appender.awaitTermination()
+ logInfo("Appender closed")
+
+ // verify whether the earliest file has been deleted
+ val rolledOverFiles = allGeneratedFiles.filter { _ != testFile.toString }.toArray.sorted
+ logInfo(s"All rolled over files generated:${rolledOverFiles.size}\n" + rolledOverFiles.mkString("\n"))
+ assert(rolledOverFiles.size > 2)
+ val earliestRolledOverFile = rolledOverFiles.head
+ val existingRolledOverFiles = RollingFileAppender.getSortedRolledOverFiles(
+ testFile.getParentFile.toString, testFile.getName).map(_.toString)
+ logInfo("Existing rolled over files:\n" + existingRolledOverFiles.mkString("\n"))
+ assert(!existingRolledOverFiles.toSet.contains(earliestRolledOverFile))
+ }
+
+ test("file appender selection") {
+ // Test whether FileAppender.apply() returns the right type of the FileAppender based
+ // on SparkConf settings.
+
+ def testAppenderSelection[ExpectedAppender: ClassTag, ExpectedRollingPolicy](
+ properties: Seq[(String, String)], expectedRollingPolicyParam: Long = -1): FileAppender = {
+
+ // Set spark conf properties
+ val conf = new SparkConf
+ properties.foreach { p =>
+ conf.set(p._1, p._2)
+ }
+
+ // Create and test file appender
+ val inputStream = new PipedInputStream(new PipedOutputStream())
+ val appender = FileAppender(inputStream, new File("stdout"), conf)
+ assert(appender.isInstanceOf[ExpectedAppender])
+ assert(appender.getClass.getSimpleName ===
+ classTag[ExpectedAppender].runtimeClass.getSimpleName)
+ if (appender.isInstanceOf[RollingFileAppender]) {
+ val rollingPolicy = appender.asInstanceOf[RollingFileAppender].rollingPolicy
+ rollingPolicy.isInstanceOf[ExpectedRollingPolicy]
+ val policyParam = if (rollingPolicy.isInstanceOf[TimeBasedRollingPolicy]) {
+ rollingPolicy.asInstanceOf[TimeBasedRollingPolicy].rolloverIntervalMillis
+ } else {
+ rollingPolicy.asInstanceOf[SizeBasedRollingPolicy].rolloverSizeBytes
+ }
+ assert(policyParam === expectedRollingPolicyParam)
+ }
+ appender
+ }
+
+ import RollingFileAppender._
+
+ def rollingStrategy(strategy: String) = Seq(STRATEGY_PROPERTY -> strategy)
+ def rollingSize(size: String) = Seq(SIZE_PROPERTY -> size)
+ def rollingInterval(interval: String) = Seq(INTERVAL_PROPERTY -> interval)
+
+ val msInDay = 24 * 60 * 60 * 1000L
+ val msInHour = 60 * 60 * 1000L
+ val msInMinute = 60 * 1000L
+
+ // test no strategy -> no rolling
+ testAppenderSelection[FileAppender, Any](Seq.empty)
+
+ // test time based rolling strategy
+ testAppenderSelection[RollingFileAppender, Any](rollingStrategy("time"), msInDay)
+ testAppenderSelection[RollingFileAppender, TimeBasedRollingPolicy](
+ rollingStrategy("time") ++ rollingInterval("daily"), msInDay)
+ testAppenderSelection[RollingFileAppender, TimeBasedRollingPolicy](
+ rollingStrategy("time") ++ rollingInterval("hourly"), msInHour)
+ testAppenderSelection[RollingFileAppender, TimeBasedRollingPolicy](
+ rollingStrategy("time") ++ rollingInterval("minutely"), msInMinute)
+ testAppenderSelection[RollingFileAppender, TimeBasedRollingPolicy](
+ rollingStrategy("time") ++ rollingInterval("123456789"), 123456789 * 1000L)
+ testAppenderSelection[FileAppender, Any](
+ rollingStrategy("time") ++ rollingInterval("xyz"))
+
+ // test size based rolling strategy
+ testAppenderSelection[RollingFileAppender, SizeBasedRollingPolicy](
+ rollingStrategy("size") ++ rollingSize("123456789"), 123456789)
+ testAppenderSelection[FileAppender, Any](rollingSize("xyz"))
+
+ // test illegal strategy
+ testAppenderSelection[FileAppender, Any](rollingStrategy("xyz"))
+ }
+
+ /**
+ * Run the rolling file appender with data and see whether all the data was written correctly
+ * across rolled over files.
+ */
+ def testRolling(
+ appender: FileAppender,
+ outputStream: OutputStream,
+ textToAppend: Seq[String],
+ sleepTimeBetweenTexts: Long
+ ): Seq[File] = {
+ // send data to appender through the input stream, and wait for the data to be written
+ val expectedText = textToAppend.mkString("")
+ for (i <- 0 until textToAppend.size) {
+ outputStream.write(textToAppend(i).getBytes("UTF8"))
+ outputStream.flush()
+ Thread.sleep(sleepTimeBetweenTexts)
+ }
+ logInfo("Data sent to appender")
+ outputStream.close()
+ appender.awaitTermination()
+ logInfo("Appender closed")
+
+ // verify whether all the data written to rolled over files is same as expected
+ val generatedFiles = RollingFileAppender.getSortedRolledOverFiles(
+ testFile.getParentFile.toString, testFile.getName)
+ logInfo("Filtered files: \n" + generatedFiles.mkString("\n"))
+ assert(generatedFiles.size > 1)
+ val allText = generatedFiles.map { file =>
+ FileUtils.readFileToString(file)
+ }.mkString("")
+ assert(allText === expectedText)
+ generatedFiles
+ }
+
+ /** Delete all the generated rolledover files */
+ def cleanup() {
+ testFile.getParentFile.listFiles.filter { file =>
+ file.getName.startsWith(testFile.getName)
+ }.foreach { _.delete() }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 0aad882ed7..1ee936bc78 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -140,6 +140,38 @@ class UtilsSuite extends FunSuite {
Utils.deleteRecursively(tmpDir2)
}
+ test("reading offset bytes across multiple files") {
+ val tmpDir = Files.createTempDir()
+ tmpDir.deleteOnExit()
+ val files = (1 to 3).map(i => new File(tmpDir, i.toString))
+ Files.write("0123456789", files(0), Charsets.UTF_8)
+ Files.write("abcdefghij", files(1), Charsets.UTF_8)
+ Files.write("ABCDEFGHIJ", files(2), Charsets.UTF_8)
+
+ // Read first few bytes in the 1st file
+ assert(Utils.offsetBytes(files, 0, 5) === "01234")
+
+ // Read bytes within the 1st file
+ assert(Utils.offsetBytes(files, 5, 8) === "567")
+
+ // Read bytes across 1st and 2nd file
+ assert(Utils.offsetBytes(files, 8, 18) === "89abcdefgh")
+
+ // Read bytes across 1st, 2nd and 3rd file
+ assert(Utils.offsetBytes(files, 5, 24) === "56789abcdefghijABCD")
+
+ // Read some nonexistent bytes in the beginning
+ assert(Utils.offsetBytes(files, -5, 18) === "0123456789abcdefgh")
+
+ // Read some nonexistent bytes at the end
+ assert(Utils.offsetBytes(files, 18, 35) === "ijABCDEFGHIJ")
+
+ // Read some nonexistent bytes on both ends
+ assert(Utils.offsetBytes(files, -5, 35) === "0123456789abcdefghijABCDEFGHIJ")
+
+ Utils.deleteRecursively(tmpDir)
+ }
+
test("deserialize long value") {
val testval : Long = 9730889947L
val bbuf = ByteBuffer.allocate(8)
diff --git a/docs/configuration.md b/docs/configuration.md
index 71fafa5734..b84104cc7e 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -784,6 +784,45 @@ Apart from these, the following properties are also available, and may be useful
higher memory usage in Spark.
</td>
</tr>
+<tr>
+ <td><code>spark.executor.logs.rolling.strategy</code></td>
+ <td>(none)</td>
+ <td>
+ Set the strategy of rolling of executor logs. By default it is disabled. It can
+ be set to "time" (time-based rolling) or "size" (size-based rolling). For "time",
+ use <code>spark.executor.logs.rolling.time.interval</code> to set the rolling interval.
+ For "size", use <code>spark.executor.logs.rolling.size.maxBytes</code> to set
+ the maximum file size for rolling.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.executor.logs.rolling.time.interval</code></td>
+ <td>daily</td>
+ <td>
+ Set the time interval by which the executor logs will be rolled over.
+ Rolling is disabled by default. Valid values are `daily`, `hourly`, `minutely` or
+ any interval in seconds. See <code>spark.executor.logs.rolling.maxRetainedFiles</code>
+ for automatic cleaning of old logs.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.executor.logs.rolling.size.maxBytes</code></td>
+ <td>(none)</td>
+ <td>
+ Set the max size of the file by which the executor logs will be rolled over.
+ Rolling is disabled by default. Value is set in terms of bytes.
+ See <code>spark.executor.logs.rolling.maxRetainedFiles</code>
+ for automatic cleaning of old logs.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.executor.logs.rolling.maxRetainedFiles</code></td>
+ <td>(none)</td>
+ <td>
+ Sets the number of latest rolling log files that are going to be retained by the system.
+ Older log files will be deleted. Disabled by default.
+ </td>
+</tr>
</table>
#### Cluster Managers