aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-06-10 20:22:02 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-06-10 20:22:02 -0700
commit4823bf470ec1b47a6f404834d4453e61d3dcbec9 (patch)
treeaf578bece5bb02fdf4a083abce3ce78b76233ff8
parent29660443077619ee854025b8d0d3d64181724054 (diff)
downloadspark-4823bf470ec1b47a6f404834d4453e61d3dcbec9.tar.gz
spark-4823bf470ec1b47a6f404834d4453e61d3dcbec9.tar.bz2
spark-4823bf470ec1b47a6f404834d4453e61d3dcbec9.zip
[SPARK-1940] Enabling rolling of executor logs, and automatic cleanup of old executor logs
Currently, in the default log4j configuration, all the executor logs get sent to the file <code>[executor-working-dir]/stderr</code>. This does not all log files to be rolled, so old logs cannot be removed. Using log4j RollingFileAppender allows log4j logs to be rolled, but all the logs get sent to a different set of files, other than the files <code>stdout</code> and <code>stderr</code> . So the logs are not visible in the Spark web UI any more as Spark web UI only reads the files <code>stdout</code> and <code>stderr</code>. Furthermore, it still does not allow the stdout and stderr to be cleared periodically in case a large amount of stuff gets written to them (e.g. by explicit `println` inside map function). This PR solves this by implementing a simple `RollingFileAppender` within Spark (disabled by default). When enabled (using configuration parameter `spark.executor.rollingLogs.enabled`), the logs can get rolled over either by time interval (set with `spark.executor.rollingLogs.interval`, set to daily by default), or by size of logs (set with `spark.executor.rollingLogs.size`). Finally, old logs can be automatically deleted by specifying how many of the latest log files to keep (set with `spark.executor.rollingLogs.keepLastN`). The web UI has also been modified to show the logs across the rolled-over files. You can test this locally (without waiting a whole day) by setting configuration `spark.executor.rollingLogs.enabled=true` and `spark.executor.rollingLogs.interval=minutely`. Continuously generate logs by running spark jobs and the generated logs files would look like this (`stderr` and `stdout` are the most current log file that are being written to). ``` stderr stderr--2014-05-27--14-37 stderr--2014-05-27--14-47 stderr--2014-05-27--15-05 stdout stdout--2014-05-27--14-47 ``` The web ui should show logs across these files. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #895 from tdas/rolling-logs and squashes the following commits: fd8f87f [Tathagata Das] Minor change. d326aee [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into rolling-logs ad956c1 [Tathagata Das] Scala style fix. 1f0a6ec [Tathagata Das] Some more changes based on Patrick's PR comments. c8bfe4e [Tathagata Das] Refactore FileAppender to a package spark.util.logging and broke up the file into multiple files. Changed configuration parameter names. 4224409 [Tathagata Das] Style fix. 108a9f8 [Tathagata Das] Added better constraint handling for rolling policies. f7da977 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into rolling-logs 9134495 [Tathagata Das] Simplified rolling logs by removing Daily/Hourly/MinutelyRollingFileAppender, and removing the setting rollingLogs.enabled 312d874 [Tathagata Das] Minor fixes based on PR comments. 8a67d83 [Tathagata Das] Fixed comments. b36cfd6 [Tathagata Das] Implemented RollingPolicy, TimeBasedRollingPolicy and SizeBasedRollingPolicy, and changed RollingFileAppender accordingly. b7e8272 [Tathagata Das] Style fix, 374c9a9 [Tathagata Das] Added missing license. 24354ea [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into rolling-logs 6cc09c7 [Tathagata Das] Fixed bugs in rolling logs, and added more debug statements. adf4910 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into rolling-logs 931f8fb [Tathagata Das] Changed log viewer in Spark web UI to handle rolling log files. cb4fb6d [Tathagata Das] Added FileAppender and RollingFileAppender to generate rolling executor logs.
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala17
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala78
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala53
-rw-r--r--core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala180
-rw-r--r--core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala163
-rw-r--r--core/src/main/scala/org/apache/spark/util/logging/RollingPolicy.scala139
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala225
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala32
-rw-r--r--docs/configuration.md39
12 files changed, 895 insertions, 40 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index d27e0e1f15..d09136de49 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -23,9 +23,10 @@ import akka.actor.ActorRef
import com.google.common.base.Charsets
import com.google.common.io.Files
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
+import org.apache.spark.util.logging.FileAppender
/**
* Manages the execution of one executor process.
@@ -42,12 +43,15 @@ private[spark] class ExecutorRunner(
val sparkHome: File,
val workDir: File,
val workerUrl: String,
+ val conf: SparkConf,
var state: ExecutorState.Value)
extends Logging {
val fullId = appId + "/" + execId
var workerThread: Thread = null
var process: Process = null
+ var stdoutAppender: FileAppender = null
+ var stderrAppender: FileAppender = null
// NOTE: This is now redundant with the automated shut-down enforced by the Executor. It might
// make sense to remove this in the future.
@@ -76,6 +80,13 @@ private[spark] class ExecutorRunner(
if (process != null) {
logInfo("Killing process!")
process.destroy()
+ process.waitFor()
+ if (stdoutAppender != null) {
+ stdoutAppender.stop()
+ }
+ if (stderrAppender != null) {
+ stderrAppender.stop()
+ }
val exitCode = process.waitFor()
worker ! ExecutorStateChanged(appId, execId, state, message, Some(exitCode))
}
@@ -137,11 +148,11 @@ private[spark] class ExecutorRunner(
// Redirect its stdout and stderr to files
val stdout = new File(executorDir, "stdout")
- CommandUtils.redirectStream(process.getInputStream, stdout)
+ stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
val stderr = new File(executorDir, "stderr")
Files.write(header, stderr, Charsets.UTF_8)
- CommandUtils.redirectStream(process.getErrorStream, stderr)
+ stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
// Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
// long-lived processes only. However, in the future, we might restart the executor a few
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 100de26170..a0ecaf709f 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -235,7 +235,7 @@ private[spark] class Worker(
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
self, workerId, host,
appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
- workDir, akkaUrl, ExecutorState.RUNNING)
+ workDir, akkaUrl, conf, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
index 8381f59672..6a5ffb1b71 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
@@ -24,8 +24,10 @@ import scala.xml.Node
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils
+import org.apache.spark.Logging
+import org.apache.spark.util.logging.{FileAppender, RollingFileAppender}
-private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
+private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with Logging {
private val worker = parent.worker
private val workDir = parent.workDir
@@ -39,21 +41,18 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
val offset = Option(request.getParameter("offset")).map(_.toLong)
val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
- val path = (appId, executorId, driverId) match {
+ val logDir = (appId, executorId, driverId) match {
case (Some(a), Some(e), None) =>
- s"${workDir.getPath}/$appId/$executorId/$logType"
+ s"${workDir.getPath}/$appId/$executorId/"
case (None, None, Some(d)) =>
- s"${workDir.getPath}/$driverId/$logType"
+ s"${workDir.getPath}/$driverId/"
case _ =>
throw new Exception("Request must specify either application or driver identifiers")
}
- val (startByte, endByte) = getByteRange(path, offset, byteLength)
- val file = new File(path)
- val logLength = file.length
-
- val pre = s"==== Bytes $startByte-$endByte of $logLength of $path ====\n"
- pre + Utils.offsetBytes(path, startByte, endByte)
+ val (logText, startByte, endByte, logLength) = getLog(logDir, logType, offset, byteLength)
+ val pre = s"==== Bytes $startByte-$endByte of $logLength of $logDir$logType ====\n"
+ pre + logText
}
def render(request: HttpServletRequest): Seq[Node] = {
@@ -65,19 +64,16 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
val offset = Option(request.getParameter("offset")).map(_.toLong)
val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
- val (path, params) = (appId, executorId, driverId) match {
+ val (logDir, params) = (appId, executorId, driverId) match {
case (Some(a), Some(e), None) =>
- (s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e")
+ (s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e")
case (None, None, Some(d)) =>
- (s"${workDir.getPath}/$d/$logType", s"driverId=$d")
+ (s"${workDir.getPath}/$d/", s"driverId=$d")
case _ =>
throw new Exception("Request must specify either application or driver identifiers")
}
- val (startByte, endByte) = getByteRange(path, offset, byteLength)
- val file = new File(path)
- val logLength = file.length
- val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node>
+ val (logText, startByte, endByte, logLength) = getLog(logDir, logType, offset, byteLength)
val linkToMaster = <p><a href={worker.activeMasterWebUiUrl}>Back to Master</a></p>
val range = <span>Bytes {startByte.toString} - {endByte.toString} of {logLength}</span>
@@ -127,23 +123,37 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
UIUtils.basicSparkPage(content, logType + " log page for " + appId)
}
- /** Determine the byte range for a log or log page. */
- private def getByteRange(path: String, offset: Option[Long], byteLength: Int): (Long, Long) = {
- val defaultBytes = 100 * 1024
- val maxBytes = 1024 * 1024
- val file = new File(path)
- val logLength = file.length()
- val getOffset = offset.getOrElse(logLength - defaultBytes)
- val startByte =
- if (getOffset < 0) {
- 0L
- } else if (getOffset > logLength) {
- logLength
- } else {
- getOffset
+ /** Get the part of the log files given the offset and desired length of bytes */
+ private def getLog(
+ logDirectory: String,
+ logType: String,
+ offsetOption: Option[Long],
+ byteLength: Int
+ ): (String, Long, Long, Long) = {
+ try {
+ val files = RollingFileAppender.getSortedRolledOverFiles(logDirectory, logType)
+ logDebug(s"Sorted log files of type $logType in $logDirectory:\n${files.mkString("\n")}")
+
+ val totalLength = files.map { _.length }.sum
+ val offset = offsetOption.getOrElse(totalLength - byteLength)
+ val startIndex = {
+ if (offset < 0) {
+ 0L
+ } else if (offset > totalLength) {
+ totalLength
+ } else {
+ offset
+ }
}
- val logPageLength = math.min(byteLength, maxBytes)
- val endByte = math.min(startByte + logPageLength, logLength)
- (startByte, endByte)
+ val endIndex = math.min(startIndex + totalLength, totalLength)
+ logDebug(s"Getting log from $startIndex to $endIndex")
+ val logText = Utils.offsetBytes(files, startIndex, endIndex)
+ logDebug(s"Got log of length ${logText.length} bytes")
+ (logText, startIndex, endIndex, totalLength)
+ } catch {
+ case e: Exception =>
+ logError(s"Error getting $logType logs from directory $logDirectory", e)
+ ("Error getting logs due to exception: " + e.getMessage, 0, 0, 0)
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 3b1b6df089..4ce28bb0cf 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -863,6 +863,59 @@ private[spark] object Utils extends Logging {
}
/**
+ * Return a string containing data across a set of files. The `startIndex`
+ * and `endIndex` is based on the cumulative size of all the files take in
+ * the given order. See figure below for more details.
+ */
+ def offsetBytes(files: Seq[File], start: Long, end: Long): String = {
+ val fileLengths = files.map { _.length }
+ val startIndex = math.max(start, 0)
+ val endIndex = math.min(end, fileLengths.sum)
+ val fileToLength = files.zip(fileLengths).toMap
+ logDebug("Log files: \n" + fileToLength.mkString("\n"))
+
+ val stringBuffer = new StringBuffer((endIndex - startIndex).toInt)
+ var sum = 0L
+ for (file <- files) {
+ val startIndexOfFile = sum
+ val endIndexOfFile = sum + fileToLength(file)
+ logDebug(s"Processing file $file, " +
+ s"with start index = $startIndexOfFile, end index = $endIndex")
+
+ /*
+ ____________
+ range 1: | |
+ | case A |
+
+ files: |==== file 1 ====|====== file 2 ======|===== file 3 =====|
+
+ | case B . case C . case D |
+ range 2: |___________.____________________.______________|
+ */
+
+ if (startIndex <= startIndexOfFile && endIndex >= endIndexOfFile) {
+ // Case C: read the whole file
+ stringBuffer.append(offsetBytes(file.getAbsolutePath, 0, fileToLength(file)))
+ } else if (startIndex > startIndexOfFile && startIndex < endIndexOfFile) {
+ // Case A and B: read from [start of required range] to [end of file / end of range]
+ val effectiveStartIndex = startIndex - startIndexOfFile
+ val effectiveEndIndex = math.min(endIndex - startIndexOfFile, fileToLength(file))
+ stringBuffer.append(Utils.offsetBytes(
+ file.getAbsolutePath, effectiveStartIndex, effectiveEndIndex))
+ } else if (endIndex > startIndexOfFile && endIndex < endIndexOfFile) {
+ // Case D: read from [start of file] to [end of require range]
+ val effectiveStartIndex = math.max(startIndex - startIndexOfFile, 0)
+ val effectiveEndIndex = endIndex - startIndexOfFile
+ stringBuffer.append(Utils.offsetBytes(
+ file.getAbsolutePath, effectiveStartIndex, effectiveEndIndex))
+ }
+ sum += fileToLength(file)
+ logDebug(s"After processing file $file, string built is ${stringBuffer.toString}}")
+ }
+ stringBuffer.toString
+ }
+
+ /**
* Clone an object using a Spark serializer.
*/
def clone[T: ClassTag](value: T, serializer: SerializerInstance): T = {
diff --git a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
new file mode 100644
index 0000000000..8e9c3036d0
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io.{File, FileOutputStream, InputStream}
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.util.{IntParam, Utils}
+
+/**
+ * Continuously appends the data from an input stream into the given file.
+ */
+private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSize: Int = 8192)
+ extends Logging {
+ @volatile private var outputStream: FileOutputStream = null
+ @volatile private var markedForStop = false // has the appender been asked to stopped
+ @volatile private var stopped = false // has the appender stopped
+
+ // Thread that reads the input stream and writes to file
+ private val writingThread = new Thread("File appending thread for " + file) {
+ setDaemon(true)
+ override def run() {
+ Utils.logUncaughtExceptions {
+ appendStreamToFile()
+ }
+ }
+ }
+ writingThread.start()
+
+ /**
+ * Wait for the appender to stop appending, either because input stream is closed
+ * or because of any error in appending
+ */
+ def awaitTermination() {
+ synchronized {
+ if (!stopped) {
+ wait()
+ }
+ }
+ }
+
+ /** Stop the appender */
+ def stop() {
+ markedForStop = true
+ }
+
+ /** Continuously read chunks from the input stream and append to the file */
+ protected def appendStreamToFile() {
+ try {
+ logDebug("Started appending thread")
+ openFile()
+ val buf = new Array[Byte](bufferSize)
+ var n = 0
+ while (!markedForStop && n != -1) {
+ n = inputStream.read(buf)
+ if (n != -1) {
+ appendToFile(buf, n)
+ }
+ }
+ } catch {
+ case e: Exception =>
+ logError(s"Error writing stream to file $file", e)
+ } finally {
+ closeFile()
+ synchronized {
+ stopped = true
+ notifyAll()
+ }
+ }
+ }
+
+ /** Append bytes to the file output stream */
+ protected def appendToFile(bytes: Array[Byte], len: Int) {
+ if (outputStream == null) {
+ openFile()
+ }
+ outputStream.write(bytes, 0, len)
+ }
+
+ /** Open the file output stream */
+ protected def openFile() {
+ outputStream = new FileOutputStream(file, false)
+ logDebug(s"Opened file $file")
+ }
+
+ /** Close the file output stream */
+ protected def closeFile() {
+ outputStream.flush()
+ outputStream.close()
+ logDebug(s"Closed file $file")
+ }
+}
+
+/**
+ * Companion object to [[org.apache.spark.util.logging.FileAppender]] which has helper
+ * functions to choose the correct type of FileAppender based on SparkConf configuration.
+ */
+private[spark] object FileAppender extends Logging {
+
+ /** Create the right appender based on Spark configuration */
+ def apply(inputStream: InputStream, file: File, conf: SparkConf): FileAppender = {
+
+ import RollingFileAppender._
+
+ val rollingStrategy = conf.get(STRATEGY_PROPERTY, STRATEGY_DEFAULT)
+ val rollingSizeBytes = conf.get(SIZE_PROPERTY, STRATEGY_DEFAULT)
+ val rollingInterval = conf.get(INTERVAL_PROPERTY, INTERVAL_DEFAULT)
+
+ def createTimeBasedAppender() = {
+ val validatedParams: Option[(Long, String)] = rollingInterval match {
+ case "daily" =>
+ logInfo(s"Rolling executor logs enabled for $file with daily rolling")
+ Some(24 * 60 * 60 * 1000L, "--YYYY-MM-dd")
+ case "hourly" =>
+ logInfo(s"Rolling executor logs enabled for $file with hourly rolling")
+ Some(60 * 60 * 1000L, "--YYYY-MM-dd--HH")
+ case "minutely" =>
+ logInfo(s"Rolling executor logs enabled for $file with rolling every minute")
+ Some(60 * 1000L, "--YYYY-MM-dd--HH-mm")
+ case IntParam(seconds) =>
+ logInfo(s"Rolling executor logs enabled for $file with rolling $seconds seconds")
+ Some(seconds * 1000L, "--YYYY-MM-dd--HH-mm-ss")
+ case _ =>
+ logWarning(s"Illegal interval for rolling executor logs [$rollingInterval], " +
+ s"rolling logs not enabled")
+ None
+ }
+ validatedParams.map {
+ case (interval, pattern) =>
+ new RollingFileAppender(
+ inputStream, file, new TimeBasedRollingPolicy(interval, pattern), conf)
+ }.getOrElse {
+ new FileAppender(inputStream, file)
+ }
+ }
+
+ def createSizeBasedAppender() = {
+ rollingSizeBytes match {
+ case IntParam(bytes) =>
+ logInfo(s"Rolling executor logs enabled for $file with rolling every $bytes bytes")
+ new RollingFileAppender(inputStream, file, new SizeBasedRollingPolicy(bytes), conf)
+ case _ =>
+ logWarning(
+ s"Illegal size [$rollingSizeBytes] for rolling executor logs, rolling logs not enabled")
+ new FileAppender(inputStream, file)
+ }
+ }
+
+ rollingStrategy match {
+ case "" =>
+ new FileAppender(inputStream, file)
+ case "time" =>
+ createTimeBasedAppender()
+ case "size" =>
+ createSizeBasedAppender()
+ case _ =>
+ logWarning(
+ s"Illegal strategy [$rollingStrategy] for rolling executor logs, " +
+ s"rolling logs not enabled")
+ new FileAppender(inputStream, file)
+ }
+ }
+}
+
+
diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
new file mode 100644
index 0000000000..1bbbd20cf0
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.io.{File, FileFilter, InputStream}
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.SparkConf
+import RollingFileAppender._
+
+/**
+ * Continuously appends data from input stream into the given file, and rolls
+ * over the file after the given interval. The rolled over files are named
+ * based on the given pattern.
+ *
+ * @param inputStream Input stream to read data from
+ * @param activeFile File to write data to
+ * @param rollingPolicy Policy based on which files will be rolled over.
+ * @param conf SparkConf that is used to pass on extra configurations
+ * @param bufferSize Optional buffer size. Used mainly for testing.
+ */
+private[spark] class RollingFileAppender(
+ inputStream: InputStream,
+ activeFile: File,
+ val rollingPolicy: RollingPolicy,
+ conf: SparkConf,
+ bufferSize: Int = DEFAULT_BUFFER_SIZE
+ ) extends FileAppender(inputStream, activeFile, bufferSize) {
+
+ private val maxRetainedFiles = conf.getInt(RETAINED_FILES_PROPERTY, -1)
+
+ /** Stop the appender */
+ override def stop() {
+ super.stop()
+ }
+
+ /** Append bytes to file after rolling over is necessary */
+ override protected def appendToFile(bytes: Array[Byte], len: Int) {
+ if (rollingPolicy.shouldRollover(len)) {
+ rollover()
+ rollingPolicy.rolledOver()
+ }
+ super.appendToFile(bytes, len)
+ rollingPolicy.bytesWritten(len)
+ }
+
+ /** Rollover the file, by closing the output stream and moving it over */
+ private def rollover() {
+ try {
+ closeFile()
+ moveFile()
+ openFile()
+ if (maxRetainedFiles > 0) {
+ deleteOldFiles()
+ }
+ } catch {
+ case e: Exception =>
+ logError(s"Error rolling over $activeFile", e)
+ }
+ }
+
+ /** Move the active log file to a new rollover file */
+ private def moveFile() {
+ val rolloverSuffix = rollingPolicy.generateRolledOverFileSuffix()
+ val rolloverFile = new File(
+ activeFile.getParentFile, activeFile.getName + rolloverSuffix).getAbsoluteFile
+ try {
+ logDebug(s"Attempting to rollover file $activeFile to file $rolloverFile")
+ if (activeFile.exists) {
+ if (!rolloverFile.exists) {
+ FileUtils.moveFile(activeFile, rolloverFile)
+ logInfo(s"Rolled over $activeFile to $rolloverFile")
+ } else {
+ // In case the rollover file name clashes, make a unique file name.
+ // The resultant file names are long and ugly, so this is used only
+ // if there is a name collision. This can be avoided by the using
+ // the right pattern such that name collisions do not occur.
+ var i = 0
+ var altRolloverFile: File = null
+ do {
+ altRolloverFile = new File(activeFile.getParent,
+ s"${activeFile.getName}$rolloverSuffix--$i").getAbsoluteFile
+ i += 1
+ } while (i < 10000 && altRolloverFile.exists)
+
+ logWarning(s"Rollover file $rolloverFile already exists, " +
+ s"rolled over $activeFile to file $altRolloverFile")
+ FileUtils.moveFile(activeFile, altRolloverFile)
+ }
+ } else {
+ logWarning(s"File $activeFile does not exist")
+ }
+ }
+ }
+
+ /** Retain only last few files */
+ private[util] def deleteOldFiles() {
+ try {
+ val rolledoverFiles = activeFile.getParentFile.listFiles(new FileFilter {
+ def accept(f: File): Boolean = {
+ f.getName.startsWith(activeFile.getName) && f != activeFile
+ }
+ }).sorted
+ val filesToBeDeleted = rolledoverFiles.take(
+ math.max(0, rolledoverFiles.size - maxRetainedFiles))
+ filesToBeDeleted.foreach { file =>
+ logInfo(s"Deleting file executor log file ${file.getAbsolutePath}")
+ file.delete()
+ }
+ } catch {
+ case e: Exception =>
+ logError("Error cleaning logs in directory " + activeFile.getParentFile.getAbsolutePath, e)
+ }
+ }
+}
+
+/**
+ * Companion object to [[org.apache.spark.util.logging.RollingFileAppender]]. Defines
+ * names of configurations that configure rolling file appenders.
+ */
+private[spark] object RollingFileAppender {
+ val STRATEGY_PROPERTY = "spark.executor.logs.rolling.strategy"
+ val STRATEGY_DEFAULT = ""
+ val INTERVAL_PROPERTY = "spark.executor.logs.rolling.time.interval"
+ val INTERVAL_DEFAULT = "daily"
+ val SIZE_PROPERTY = "spark.executor.logs.rolling.size.maxBytes"
+ val SIZE_DEFAULT = (1024 * 1024).toString
+ val RETAINED_FILES_PROPERTY = "spark.executor.logs.rolling.maxRetainedFiles"
+ val DEFAULT_BUFFER_SIZE = 8192
+
+ /**
+ * Get the sorted list of rolled over files. This assumes that the all the rolled
+ * over file names are prefixed with the `activeFileName`, and the active file
+ * name has the latest logs. So it sorts all the rolled over logs (that are
+ * prefixed with `activeFileName`) and appends the active file
+ */
+ def getSortedRolledOverFiles(directory: String, activeFileName: String): Seq[File] = {
+ val rolledOverFiles = new File(directory).getAbsoluteFile.listFiles.filter { file =>
+ val fileName = file.getName
+ fileName.startsWith(activeFileName) && fileName != activeFileName
+ }.sorted
+ val activeFile = {
+ val file = new File(directory, activeFileName).getAbsoluteFile
+ if (file.exists) Some(file) else None
+ }
+ rolledOverFiles ++ activeFile
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingPolicy.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingPolicy.scala
new file mode 100644
index 0000000000..84e5c3c917
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/logging/RollingPolicy.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.logging
+
+import java.text.SimpleDateFormat
+import java.util.Calendar
+
+import org.apache.spark.Logging
+
+/**
+ * Defines the policy based on which [[org.apache.spark.util.logging.RollingFileAppender]] will
+ * generate rolling files.
+ */
+private[spark] trait RollingPolicy {
+
+ /** Whether rollover should be initiated at this moment */
+ def shouldRollover(bytesToBeWritten: Long): Boolean
+
+ /** Notify that rollover has occurred */
+ def rolledOver()
+
+ /** Notify that bytes have been written */
+ def bytesWritten(bytes: Long)
+
+ /** Get the desired name of the rollover file */
+ def generateRolledOverFileSuffix(): String
+}
+
+/**
+ * Defines a [[org.apache.spark.util.logging.RollingPolicy]] by which files will be rolled
+ * over at a fixed interval.
+ */
+private[spark] class TimeBasedRollingPolicy(
+ var rolloverIntervalMillis: Long,
+ rollingFileSuffixPattern: String,
+ checkIntervalConstraint: Boolean = true // set to false while testing
+ ) extends RollingPolicy with Logging {
+
+ import TimeBasedRollingPolicy._
+ if (checkIntervalConstraint && rolloverIntervalMillis < MINIMUM_INTERVAL_SECONDS * 1000L) {
+ logWarning(s"Rolling interval [${rolloverIntervalMillis/1000L} seconds] is too small. " +
+ s"Setting the interval to the acceptable minimum of $MINIMUM_INTERVAL_SECONDS seconds.")
+ rolloverIntervalMillis = MINIMUM_INTERVAL_SECONDS * 1000L
+ }
+
+ @volatile private var nextRolloverTime = calculateNextRolloverTime()
+ private val formatter = new SimpleDateFormat(rollingFileSuffixPattern)
+
+ /** Should rollover if current time has exceeded next rollover time */
+ def shouldRollover(bytesToBeWritten: Long): Boolean = {
+ System.currentTimeMillis > nextRolloverTime
+ }
+
+ /** Rollover has occurred, so find the next time to rollover */
+ def rolledOver() {
+ nextRolloverTime = calculateNextRolloverTime()
+ logDebug(s"Current time: ${System.currentTimeMillis}, next rollover time: " + nextRolloverTime)
+ }
+
+ def bytesWritten(bytes: Long) { } // nothing to do
+
+ private def calculateNextRolloverTime(): Long = {
+ val now = System.currentTimeMillis()
+ val targetTime = (
+ math.ceil(now.toDouble / rolloverIntervalMillis) * rolloverIntervalMillis
+ ).toLong
+ logDebug(s"Next rollover time is $targetTime")
+ targetTime
+ }
+
+ def generateRolledOverFileSuffix(): String = {
+ formatter.format(Calendar.getInstance.getTime)
+ }
+}
+
+private[spark] object TimeBasedRollingPolicy {
+ val MINIMUM_INTERVAL_SECONDS = 60L // 1 minute
+}
+
+/**
+ * Defines a [[org.apache.spark.util.logging.RollingPolicy]] by which files will be rolled
+ * over after reaching a particular size.
+ */
+private[spark] class SizeBasedRollingPolicy(
+ var rolloverSizeBytes: Long,
+ checkSizeConstraint: Boolean = true // set to false while testing
+ ) extends RollingPolicy with Logging {
+
+ import SizeBasedRollingPolicy._
+ if (checkSizeConstraint && rolloverSizeBytes < MINIMUM_SIZE_BYTES) {
+ logWarning(s"Rolling size [$rolloverSizeBytes bytes] is too small. " +
+ s"Setting the size to the acceptable minimum of $MINIMUM_SIZE_BYTES bytes.")
+ rolloverSizeBytes = MINIMUM_SIZE_BYTES
+ }
+
+ @volatile private var bytesWrittenSinceRollover = 0L
+ val formatter = new SimpleDateFormat("--YYYY-MM-dd--HH-mm-ss--SSSS")
+
+ /** Should rollover if the next set of bytes is going to exceed the size limit */
+ def shouldRollover(bytesToBeWritten: Long): Boolean = {
+ logInfo(s"$bytesToBeWritten + $bytesWrittenSinceRollover > $rolloverSizeBytes")
+ bytesToBeWritten + bytesWrittenSinceRollover > rolloverSizeBytes
+ }
+
+ /** Rollover has occurred, so reset the counter */
+ def rolledOver() {
+ bytesWrittenSinceRollover = 0
+ }
+
+ /** Increment the bytes that have been written in the current file */
+ def bytesWritten(bytes: Long) {
+ bytesWrittenSinceRollover += bytes
+ }
+
+ /** Get the desired name of the rollover file */
+ def generateRolledOverFileSuffix(): String = {
+ formatter.format(Calendar.getInstance.getTime)
+ }
+}
+
+private[spark] object SizeBasedRollingPolicy {
+ val MINIMUM_SIZE_BYTES = RollingFileAppender.DEFAULT_BUFFER_SIZE * 10
+}
+
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index bfae32dae0..01ab2d5493 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -28,6 +28,7 @@ import org.scalatest.FunSuite
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, RecoveryState, WorkerInfo}
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
+import org.apache.spark.SparkConf
class JsonProtocolSuite extends FunSuite {
@@ -116,7 +117,8 @@ class JsonProtocolSuite extends FunSuite {
}
def createExecutorRunner(): ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
- new File("sparkHome"), new File("workDir"), "akka://worker", ExecutorState.RUNNING)
+ new File("sparkHome"), new File("workDir"), "akka://worker",
+ new SparkConf, ExecutorState.RUNNING)
}
def createDriverRunner(): DriverRunner = {
new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), createDriverDesc(),
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 8ae387fa0b..e5f748d555 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -22,6 +22,7 @@ import java.io.File
import org.scalatest.FunSuite
import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
+import org.apache.spark.SparkConf
class ExecutorRunnerTest extends FunSuite {
test("command includes appId") {
@@ -32,7 +33,7 @@ class ExecutorRunnerTest extends FunSuite {
sparkHome, "appUiUrl")
val appId = "12345-worker321-9876"
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")),
- f("ooga"), "blah", ExecutorState.RUNNING)
+ f("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
assert(er.getCommandSeq.last === appId)
}
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
new file mode 100644
index 0000000000..53d7f5c607
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io._
+
+import scala.collection.mutable.HashSet
+import scala.reflect._
+
+import org.apache.commons.io.{FileUtils, IOUtils}
+import org.apache.spark.{Logging, SparkConf}
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.apache.spark.util.logging.{RollingFileAppender, SizeBasedRollingPolicy, TimeBasedRollingPolicy, FileAppender}
+
+class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
+
+ val testFile = new File("FileAppenderSuite-test-" + System.currentTimeMillis).getAbsoluteFile
+
+ before {
+ cleanup()
+ }
+
+ after {
+ cleanup()
+ }
+
+ test("basic file appender") {
+ val testString = (1 to 1000).mkString(", ")
+ val inputStream = IOUtils.toInputStream(testString)
+ val appender = new FileAppender(inputStream, testFile)
+ inputStream.close()
+ appender.awaitTermination()
+ assert(FileUtils.readFileToString(testFile) === testString)
+ }
+
+ test("rolling file appender - time-based rolling") {
+ // setup input stream and appender
+ val testOutputStream = new PipedOutputStream()
+ val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
+ val rolloverIntervalMillis = 100
+ val durationMillis = 1000
+ val numRollovers = durationMillis / rolloverIntervalMillis
+ val textToAppend = (1 to numRollovers).map( _.toString * 10 )
+
+ val appender = new RollingFileAppender(testInputStream, testFile,
+ new TimeBasedRollingPolicy(rolloverIntervalMillis, s"--HH-mm-ss-SSSS", false),
+ new SparkConf(), 10)
+
+ testRolling(appender, testOutputStream, textToAppend, rolloverIntervalMillis)
+ }
+
+ test("rolling file appender - size-based rolling") {
+ // setup input stream and appender
+ val testOutputStream = new PipedOutputStream()
+ val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
+ val rolloverSize = 1000
+ val textToAppend = (1 to 3).map( _.toString * 1000 )
+
+ val appender = new RollingFileAppender(testInputStream, testFile,
+ new SizeBasedRollingPolicy(rolloverSize, false), new SparkConf(), 99)
+
+ val files = testRolling(appender, testOutputStream, textToAppend, 0)
+ files.foreach { file =>
+ logInfo(file.toString + ": " + file.length + " bytes")
+ assert(file.length <= rolloverSize)
+ }
+ }
+
+ test("rolling file appender - cleaning") {
+ // setup input stream and appender
+ val testOutputStream = new PipedOutputStream()
+ val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
+ val conf = new SparkConf().set(RollingFileAppender.RETAINED_FILES_PROPERTY, "10")
+ val appender = new RollingFileAppender(testInputStream, testFile,
+ new SizeBasedRollingPolicy(1000, false), conf, 10)
+
+ // send data to appender through the input stream, and wait for the data to be written
+ val allGeneratedFiles = new HashSet[String]()
+ val items = (1 to 10).map { _.toString * 10000 }
+ for (i <- 0 until items.size) {
+ testOutputStream.write(items(i).getBytes("UTF8"))
+ testOutputStream.flush()
+ allGeneratedFiles ++= RollingFileAppender.getSortedRolledOverFiles(
+ testFile.getParentFile.toString, testFile.getName).map(_.toString)
+
+ Thread.sleep(10)
+ }
+ testOutputStream.close()
+ appender.awaitTermination()
+ logInfo("Appender closed")
+
+ // verify whether the earliest file has been deleted
+ val rolledOverFiles = allGeneratedFiles.filter { _ != testFile.toString }.toArray.sorted
+ logInfo(s"All rolled over files generated:${rolledOverFiles.size}\n" + rolledOverFiles.mkString("\n"))
+ assert(rolledOverFiles.size > 2)
+ val earliestRolledOverFile = rolledOverFiles.head
+ val existingRolledOverFiles = RollingFileAppender.getSortedRolledOverFiles(
+ testFile.getParentFile.toString, testFile.getName).map(_.toString)
+ logInfo("Existing rolled over files:\n" + existingRolledOverFiles.mkString("\n"))
+ assert(!existingRolledOverFiles.toSet.contains(earliestRolledOverFile))
+ }
+
+ test("file appender selection") {
+ // Test whether FileAppender.apply() returns the right type of the FileAppender based
+ // on SparkConf settings.
+
+ def testAppenderSelection[ExpectedAppender: ClassTag, ExpectedRollingPolicy](
+ properties: Seq[(String, String)], expectedRollingPolicyParam: Long = -1): FileAppender = {
+
+ // Set spark conf properties
+ val conf = new SparkConf
+ properties.foreach { p =>
+ conf.set(p._1, p._2)
+ }
+
+ // Create and test file appender
+ val inputStream = new PipedInputStream(new PipedOutputStream())
+ val appender = FileAppender(inputStream, new File("stdout"), conf)
+ assert(appender.isInstanceOf[ExpectedAppender])
+ assert(appender.getClass.getSimpleName ===
+ classTag[ExpectedAppender].runtimeClass.getSimpleName)
+ if (appender.isInstanceOf[RollingFileAppender]) {
+ val rollingPolicy = appender.asInstanceOf[RollingFileAppender].rollingPolicy
+ rollingPolicy.isInstanceOf[ExpectedRollingPolicy]
+ val policyParam = if (rollingPolicy.isInstanceOf[TimeBasedRollingPolicy]) {
+ rollingPolicy.asInstanceOf[TimeBasedRollingPolicy].rolloverIntervalMillis
+ } else {
+ rollingPolicy.asInstanceOf[SizeBasedRollingPolicy].rolloverSizeBytes
+ }
+ assert(policyParam === expectedRollingPolicyParam)
+ }
+ appender
+ }
+
+ import RollingFileAppender._
+
+ def rollingStrategy(strategy: String) = Seq(STRATEGY_PROPERTY -> strategy)
+ def rollingSize(size: String) = Seq(SIZE_PROPERTY -> size)
+ def rollingInterval(interval: String) = Seq(INTERVAL_PROPERTY -> interval)
+
+ val msInDay = 24 * 60 * 60 * 1000L
+ val msInHour = 60 * 60 * 1000L
+ val msInMinute = 60 * 1000L
+
+ // test no strategy -> no rolling
+ testAppenderSelection[FileAppender, Any](Seq.empty)
+
+ // test time based rolling strategy
+ testAppenderSelection[RollingFileAppender, Any](rollingStrategy("time"), msInDay)
+ testAppenderSelection[RollingFileAppender, TimeBasedRollingPolicy](
+ rollingStrategy("time") ++ rollingInterval("daily"), msInDay)
+ testAppenderSelection[RollingFileAppender, TimeBasedRollingPolicy](
+ rollingStrategy("time") ++ rollingInterval("hourly"), msInHour)
+ testAppenderSelection[RollingFileAppender, TimeBasedRollingPolicy](
+ rollingStrategy("time") ++ rollingInterval("minutely"), msInMinute)
+ testAppenderSelection[RollingFileAppender, TimeBasedRollingPolicy](
+ rollingStrategy("time") ++ rollingInterval("123456789"), 123456789 * 1000L)
+ testAppenderSelection[FileAppender, Any](
+ rollingStrategy("time") ++ rollingInterval("xyz"))
+
+ // test size based rolling strategy
+ testAppenderSelection[RollingFileAppender, SizeBasedRollingPolicy](
+ rollingStrategy("size") ++ rollingSize("123456789"), 123456789)
+ testAppenderSelection[FileAppender, Any](rollingSize("xyz"))
+
+ // test illegal strategy
+ testAppenderSelection[FileAppender, Any](rollingStrategy("xyz"))
+ }
+
+ /**
+ * Run the rolling file appender with data and see whether all the data was written correctly
+ * across rolled over files.
+ */
+ def testRolling(
+ appender: FileAppender,
+ outputStream: OutputStream,
+ textToAppend: Seq[String],
+ sleepTimeBetweenTexts: Long
+ ): Seq[File] = {
+ // send data to appender through the input stream, and wait for the data to be written
+ val expectedText = textToAppend.mkString("")
+ for (i <- 0 until textToAppend.size) {
+ outputStream.write(textToAppend(i).getBytes("UTF8"))
+ outputStream.flush()
+ Thread.sleep(sleepTimeBetweenTexts)
+ }
+ logInfo("Data sent to appender")
+ outputStream.close()
+ appender.awaitTermination()
+ logInfo("Appender closed")
+
+ // verify whether all the data written to rolled over files is same as expected
+ val generatedFiles = RollingFileAppender.getSortedRolledOverFiles(
+ testFile.getParentFile.toString, testFile.getName)
+ logInfo("Filtered files: \n" + generatedFiles.mkString("\n"))
+ assert(generatedFiles.size > 1)
+ val allText = generatedFiles.map { file =>
+ FileUtils.readFileToString(file)
+ }.mkString("")
+ assert(allText === expectedText)
+ generatedFiles
+ }
+
+ /** Delete all the generated rolledover files */
+ def cleanup() {
+ testFile.getParentFile.listFiles.filter { file =>
+ file.getName.startsWith(testFile.getName)
+ }.foreach { _.delete() }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 0aad882ed7..1ee936bc78 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -140,6 +140,38 @@ class UtilsSuite extends FunSuite {
Utils.deleteRecursively(tmpDir2)
}
+ test("reading offset bytes across multiple files") {
+ val tmpDir = Files.createTempDir()
+ tmpDir.deleteOnExit()
+ val files = (1 to 3).map(i => new File(tmpDir, i.toString))
+ Files.write("0123456789", files(0), Charsets.UTF_8)
+ Files.write("abcdefghij", files(1), Charsets.UTF_8)
+ Files.write("ABCDEFGHIJ", files(2), Charsets.UTF_8)
+
+ // Read first few bytes in the 1st file
+ assert(Utils.offsetBytes(files, 0, 5) === "01234")
+
+ // Read bytes within the 1st file
+ assert(Utils.offsetBytes(files, 5, 8) === "567")
+
+ // Read bytes across 1st and 2nd file
+ assert(Utils.offsetBytes(files, 8, 18) === "89abcdefgh")
+
+ // Read bytes across 1st, 2nd and 3rd file
+ assert(Utils.offsetBytes(files, 5, 24) === "56789abcdefghijABCD")
+
+ // Read some nonexistent bytes in the beginning
+ assert(Utils.offsetBytes(files, -5, 18) === "0123456789abcdefgh")
+
+ // Read some nonexistent bytes at the end
+ assert(Utils.offsetBytes(files, 18, 35) === "ijABCDEFGHIJ")
+
+ // Read some nonexistent bytes on both ends
+ assert(Utils.offsetBytes(files, -5, 35) === "0123456789abcdefghijABCDEFGHIJ")
+
+ Utils.deleteRecursively(tmpDir)
+ }
+
test("deserialize long value") {
val testval : Long = 9730889947L
val bbuf = ByteBuffer.allocate(8)
diff --git a/docs/configuration.md b/docs/configuration.md
index 71fafa5734..b84104cc7e 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -784,6 +784,45 @@ Apart from these, the following properties are also available, and may be useful
higher memory usage in Spark.
</td>
</tr>
+<tr>
+ <td><code>spark.executor.logs.rolling.strategy</code></td>
+ <td>(none)</td>
+ <td>
+ Set the strategy of rolling of executor logs. By default it is disabled. It can
+ be set to "time" (time-based rolling) or "size" (size-based rolling). For "time",
+ use <code>spark.executor.logs.rolling.time.interval</code> to set the rolling interval.
+ For "size", use <code>spark.executor.logs.rolling.size.maxBytes</code> to set
+ the maximum file size for rolling.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.executor.logs.rolling.time.interval</code></td>
+ <td>daily</td>
+ <td>
+ Set the time interval by which the executor logs will be rolled over.
+ Rolling is disabled by default. Valid values are `daily`, `hourly`, `minutely` or
+ any interval in seconds. See <code>spark.executor.logs.rolling.maxRetainedFiles</code>
+ for automatic cleaning of old logs.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.executor.logs.rolling.size.maxBytes</code></td>
+ <td>(none)</td>
+ <td>
+ Set the max size of the file by which the executor logs will be rolled over.
+ Rolling is disabled by default. Value is set in terms of bytes.
+ See <code>spark.executor.logs.rolling.maxRetainedFiles</code>
+ for automatic cleaning of old logs.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.executor.logs.rolling.maxRetainedFiles</code></td>
+ <td>(none)</td>
+ <td>
+ Sets the number of latest rolling log files that are going to be retained by the system.
+ Older log files will be deleted. Disabled by default.
+ </td>
+</tr>
</table>
#### Cluster Managers