diff options
12 files changed, 895 insertions, 40 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index d27e0e1f15..d09136de49 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -23,9 +23,10 @@ import akka.actor.ActorRef import com.google.common.base.Charsets import com.google.common.io.Files -import org.apache.spark.Logging +import org.apache.spark.{SparkConf, Logging} import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState} import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged +import org.apache.spark.util.logging.FileAppender /** * Manages the execution of one executor process. @@ -42,12 +43,15 @@ private[spark] class ExecutorRunner( val sparkHome: File, val workDir: File, val workerUrl: String, + val conf: SparkConf, var state: ExecutorState.Value) extends Logging { val fullId = appId + "/" + execId var workerThread: Thread = null var process: Process = null + var stdoutAppender: FileAppender = null + var stderrAppender: FileAppender = null // NOTE: This is now redundant with the automated shut-down enforced by the Executor. It might // make sense to remove this in the future. @@ -76,6 +80,13 @@ private[spark] class ExecutorRunner( if (process != null) { logInfo("Killing process!") process.destroy() + process.waitFor() + if (stdoutAppender != null) { + stdoutAppender.stop() + } + if (stderrAppender != null) { + stderrAppender.stop() + } val exitCode = process.waitFor() worker ! ExecutorStateChanged(appId, execId, state, message, Some(exitCode)) } @@ -137,11 +148,11 @@ private[spark] class ExecutorRunner( // Redirect its stdout and stderr to files val stdout = new File(executorDir, "stdout") - CommandUtils.redirectStream(process.getInputStream, stdout) + stdoutAppender = FileAppender(process.getInputStream, stdout, conf) val stderr = new File(executorDir, "stderr") Files.write(header, stderr, Charsets.UTF_8) - CommandUtils.redirectStream(process.getErrorStream, stderr) + stderrAppender = FileAppender(process.getErrorStream, stderr, conf) // Wait for it to exit; this is actually a bad thing if it happens, because we expect to run // long-lived processes only. However, in the future, we might restart the executor a few diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 100de26170..a0ecaf709f 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -235,7 +235,7 @@ private[spark] class Worker( val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, self, workerId, host, appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome), - workDir, akkaUrl, ExecutorState.RUNNING) + workDir, akkaUrl, conf, ExecutorState.RUNNING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala index 8381f59672..6a5ffb1b71 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala @@ -24,8 +24,10 @@ import scala.xml.Node import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils +import org.apache.spark.Logging +import org.apache.spark.util.logging.{FileAppender, RollingFileAppender} -private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") { +private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with Logging { private val worker = parent.worker private val workDir = parent.workDir @@ -39,21 +41,18 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") { val offset = Option(request.getParameter("offset")).map(_.toLong) val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) - val path = (appId, executorId, driverId) match { + val logDir = (appId, executorId, driverId) match { case (Some(a), Some(e), None) => - s"${workDir.getPath}/$appId/$executorId/$logType" + s"${workDir.getPath}/$appId/$executorId/" case (None, None, Some(d)) => - s"${workDir.getPath}/$driverId/$logType" + s"${workDir.getPath}/$driverId/" case _ => throw new Exception("Request must specify either application or driver identifiers") } - val (startByte, endByte) = getByteRange(path, offset, byteLength) - val file = new File(path) - val logLength = file.length - - val pre = s"==== Bytes $startByte-$endByte of $logLength of $path ====\n" - pre + Utils.offsetBytes(path, startByte, endByte) + val (logText, startByte, endByte, logLength) = getLog(logDir, logType, offset, byteLength) + val pre = s"==== Bytes $startByte-$endByte of $logLength of $logDir$logType ====\n" + pre + logText } def render(request: HttpServletRequest): Seq[Node] = { @@ -65,19 +64,16 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") { val offset = Option(request.getParameter("offset")).map(_.toLong) val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) - val (path, params) = (appId, executorId, driverId) match { + val (logDir, params) = (appId, executorId, driverId) match { case (Some(a), Some(e), None) => - (s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e") + (s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e") case (None, None, Some(d)) => - (s"${workDir.getPath}/$d/$logType", s"driverId=$d") + (s"${workDir.getPath}/$d/", s"driverId=$d") case _ => throw new Exception("Request must specify either application or driver identifiers") } - val (startByte, endByte) = getByteRange(path, offset, byteLength) - val file = new File(path) - val logLength = file.length - val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node> + val (logText, startByte, endByte, logLength) = getLog(logDir, logType, offset, byteLength) val linkToMaster = <p><a href={worker.activeMasterWebUiUrl}>Back to Master</a></p> val range = <span>Bytes {startByte.toString} - {endByte.toString} of {logLength}</span> @@ -127,23 +123,37 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") { UIUtils.basicSparkPage(content, logType + " log page for " + appId) } - /** Determine the byte range for a log or log page. */ - private def getByteRange(path: String, offset: Option[Long], byteLength: Int): (Long, Long) = { - val defaultBytes = 100 * 1024 - val maxBytes = 1024 * 1024 - val file = new File(path) - val logLength = file.length() - val getOffset = offset.getOrElse(logLength - defaultBytes) - val startByte = - if (getOffset < 0) { - 0L - } else if (getOffset > logLength) { - logLength - } else { - getOffset + /** Get the part of the log files given the offset and desired length of bytes */ + private def getLog( + logDirectory: String, + logType: String, + offsetOption: Option[Long], + byteLength: Int + ): (String, Long, Long, Long) = { + try { + val files = RollingFileAppender.getSortedRolledOverFiles(logDirectory, logType) + logDebug(s"Sorted log files of type $logType in $logDirectory:\n${files.mkString("\n")}") + + val totalLength = files.map { _.length }.sum + val offset = offsetOption.getOrElse(totalLength - byteLength) + val startIndex = { + if (offset < 0) { + 0L + } else if (offset > totalLength) { + totalLength + } else { + offset + } } - val logPageLength = math.min(byteLength, maxBytes) - val endByte = math.min(startByte + logPageLength, logLength) - (startByte, endByte) + val endIndex = math.min(startIndex + totalLength, totalLength) + logDebug(s"Getting log from $startIndex to $endIndex") + val logText = Utils.offsetBytes(files, startIndex, endIndex) + logDebug(s"Got log of length ${logText.length} bytes") + (logText, startIndex, endIndex, totalLength) + } catch { + case e: Exception => + logError(s"Error getting $logType logs from directory $logDirectory", e) + ("Error getting logs due to exception: " + e.getMessage, 0, 0, 0) + } } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 3b1b6df089..4ce28bb0cf 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -863,6 +863,59 @@ private[spark] object Utils extends Logging { } /** + * Return a string containing data across a set of files. The `startIndex` + * and `endIndex` is based on the cumulative size of all the files take in + * the given order. See figure below for more details. + */ + def offsetBytes(files: Seq[File], start: Long, end: Long): String = { + val fileLengths = files.map { _.length } + val startIndex = math.max(start, 0) + val endIndex = math.min(end, fileLengths.sum) + val fileToLength = files.zip(fileLengths).toMap + logDebug("Log files: \n" + fileToLength.mkString("\n")) + + val stringBuffer = new StringBuffer((endIndex - startIndex).toInt) + var sum = 0L + for (file <- files) { + val startIndexOfFile = sum + val endIndexOfFile = sum + fileToLength(file) + logDebug(s"Processing file $file, " + + s"with start index = $startIndexOfFile, end index = $endIndex") + + /* + ____________ + range 1: | | + | case A | + + files: |==== file 1 ====|====== file 2 ======|===== file 3 =====| + + | case B . case C . case D | + range 2: |___________.____________________.______________| + */ + + if (startIndex <= startIndexOfFile && endIndex >= endIndexOfFile) { + // Case C: read the whole file + stringBuffer.append(offsetBytes(file.getAbsolutePath, 0, fileToLength(file))) + } else if (startIndex > startIndexOfFile && startIndex < endIndexOfFile) { + // Case A and B: read from [start of required range] to [end of file / end of range] + val effectiveStartIndex = startIndex - startIndexOfFile + val effectiveEndIndex = math.min(endIndex - startIndexOfFile, fileToLength(file)) + stringBuffer.append(Utils.offsetBytes( + file.getAbsolutePath, effectiveStartIndex, effectiveEndIndex)) + } else if (endIndex > startIndexOfFile && endIndex < endIndexOfFile) { + // Case D: read from [start of file] to [end of require range] + val effectiveStartIndex = math.max(startIndex - startIndexOfFile, 0) + val effectiveEndIndex = endIndex - startIndexOfFile + stringBuffer.append(Utils.offsetBytes( + file.getAbsolutePath, effectiveStartIndex, effectiveEndIndex)) + } + sum += fileToLength(file) + logDebug(s"After processing file $file, string built is ${stringBuffer.toString}}") + } + stringBuffer.toString + } + + /** * Clone an object using a Spark serializer. */ def clone[T: ClassTag](value: T, serializer: SerializerInstance): T = { diff --git a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala new file mode 100644 index 0000000000..8e9c3036d0 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io.{File, FileOutputStream, InputStream} + +import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.util.{IntParam, Utils} + +/** + * Continuously appends the data from an input stream into the given file. + */ +private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSize: Int = 8192) + extends Logging { + @volatile private var outputStream: FileOutputStream = null + @volatile private var markedForStop = false // has the appender been asked to stopped + @volatile private var stopped = false // has the appender stopped + + // Thread that reads the input stream and writes to file + private val writingThread = new Thread("File appending thread for " + file) { + setDaemon(true) + override def run() { + Utils.logUncaughtExceptions { + appendStreamToFile() + } + } + } + writingThread.start() + + /** + * Wait for the appender to stop appending, either because input stream is closed + * or because of any error in appending + */ + def awaitTermination() { + synchronized { + if (!stopped) { + wait() + } + } + } + + /** Stop the appender */ + def stop() { + markedForStop = true + } + + /** Continuously read chunks from the input stream and append to the file */ + protected def appendStreamToFile() { + try { + logDebug("Started appending thread") + openFile() + val buf = new Array[Byte](bufferSize) + var n = 0 + while (!markedForStop && n != -1) { + n = inputStream.read(buf) + if (n != -1) { + appendToFile(buf, n) + } + } + } catch { + case e: Exception => + logError(s"Error writing stream to file $file", e) + } finally { + closeFile() + synchronized { + stopped = true + notifyAll() + } + } + } + + /** Append bytes to the file output stream */ + protected def appendToFile(bytes: Array[Byte], len: Int) { + if (outputStream == null) { + openFile() + } + outputStream.write(bytes, 0, len) + } + + /** Open the file output stream */ + protected def openFile() { + outputStream = new FileOutputStream(file, false) + logDebug(s"Opened file $file") + } + + /** Close the file output stream */ + protected def closeFile() { + outputStream.flush() + outputStream.close() + logDebug(s"Closed file $file") + } +} + +/** + * Companion object to [[org.apache.spark.util.logging.FileAppender]] which has helper + * functions to choose the correct type of FileAppender based on SparkConf configuration. + */ +private[spark] object FileAppender extends Logging { + + /** Create the right appender based on Spark configuration */ + def apply(inputStream: InputStream, file: File, conf: SparkConf): FileAppender = { + + import RollingFileAppender._ + + val rollingStrategy = conf.get(STRATEGY_PROPERTY, STRATEGY_DEFAULT) + val rollingSizeBytes = conf.get(SIZE_PROPERTY, STRATEGY_DEFAULT) + val rollingInterval = conf.get(INTERVAL_PROPERTY, INTERVAL_DEFAULT) + + def createTimeBasedAppender() = { + val validatedParams: Option[(Long, String)] = rollingInterval match { + case "daily" => + logInfo(s"Rolling executor logs enabled for $file with daily rolling") + Some(24 * 60 * 60 * 1000L, "--YYYY-MM-dd") + case "hourly" => + logInfo(s"Rolling executor logs enabled for $file with hourly rolling") + Some(60 * 60 * 1000L, "--YYYY-MM-dd--HH") + case "minutely" => + logInfo(s"Rolling executor logs enabled for $file with rolling every minute") + Some(60 * 1000L, "--YYYY-MM-dd--HH-mm") + case IntParam(seconds) => + logInfo(s"Rolling executor logs enabled for $file with rolling $seconds seconds") + Some(seconds * 1000L, "--YYYY-MM-dd--HH-mm-ss") + case _ => + logWarning(s"Illegal interval for rolling executor logs [$rollingInterval], " + + s"rolling logs not enabled") + None + } + validatedParams.map { + case (interval, pattern) => + new RollingFileAppender( + inputStream, file, new TimeBasedRollingPolicy(interval, pattern), conf) + }.getOrElse { + new FileAppender(inputStream, file) + } + } + + def createSizeBasedAppender() = { + rollingSizeBytes match { + case IntParam(bytes) => + logInfo(s"Rolling executor logs enabled for $file with rolling every $bytes bytes") + new RollingFileAppender(inputStream, file, new SizeBasedRollingPolicy(bytes), conf) + case _ => + logWarning( + s"Illegal size [$rollingSizeBytes] for rolling executor logs, rolling logs not enabled") + new FileAppender(inputStream, file) + } + } + + rollingStrategy match { + case "" => + new FileAppender(inputStream, file) + case "time" => + createTimeBasedAppender() + case "size" => + createSizeBasedAppender() + case _ => + logWarning( + s"Illegal strategy [$rollingStrategy] for rolling executor logs, " + + s"rolling logs not enabled") + new FileAppender(inputStream, file) + } + } +} + + diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala new file mode 100644 index 0000000000..1bbbd20cf0 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.io.{File, FileFilter, InputStream} + +import org.apache.commons.io.FileUtils +import org.apache.spark.SparkConf +import RollingFileAppender._ + +/** + * Continuously appends data from input stream into the given file, and rolls + * over the file after the given interval. The rolled over files are named + * based on the given pattern. + * + * @param inputStream Input stream to read data from + * @param activeFile File to write data to + * @param rollingPolicy Policy based on which files will be rolled over. + * @param conf SparkConf that is used to pass on extra configurations + * @param bufferSize Optional buffer size. Used mainly for testing. + */ +private[spark] class RollingFileAppender( + inputStream: InputStream, + activeFile: File, + val rollingPolicy: RollingPolicy, + conf: SparkConf, + bufferSize: Int = DEFAULT_BUFFER_SIZE + ) extends FileAppender(inputStream, activeFile, bufferSize) { + + private val maxRetainedFiles = conf.getInt(RETAINED_FILES_PROPERTY, -1) + + /** Stop the appender */ + override def stop() { + super.stop() + } + + /** Append bytes to file after rolling over is necessary */ + override protected def appendToFile(bytes: Array[Byte], len: Int) { + if (rollingPolicy.shouldRollover(len)) { + rollover() + rollingPolicy.rolledOver() + } + super.appendToFile(bytes, len) + rollingPolicy.bytesWritten(len) + } + + /** Rollover the file, by closing the output stream and moving it over */ + private def rollover() { + try { + closeFile() + moveFile() + openFile() + if (maxRetainedFiles > 0) { + deleteOldFiles() + } + } catch { + case e: Exception => + logError(s"Error rolling over $activeFile", e) + } + } + + /** Move the active log file to a new rollover file */ + private def moveFile() { + val rolloverSuffix = rollingPolicy.generateRolledOverFileSuffix() + val rolloverFile = new File( + activeFile.getParentFile, activeFile.getName + rolloverSuffix).getAbsoluteFile + try { + logDebug(s"Attempting to rollover file $activeFile to file $rolloverFile") + if (activeFile.exists) { + if (!rolloverFile.exists) { + FileUtils.moveFile(activeFile, rolloverFile) + logInfo(s"Rolled over $activeFile to $rolloverFile") + } else { + // In case the rollover file name clashes, make a unique file name. + // The resultant file names are long and ugly, so this is used only + // if there is a name collision. This can be avoided by the using + // the right pattern such that name collisions do not occur. + var i = 0 + var altRolloverFile: File = null + do { + altRolloverFile = new File(activeFile.getParent, + s"${activeFile.getName}$rolloverSuffix--$i").getAbsoluteFile + i += 1 + } while (i < 10000 && altRolloverFile.exists) + + logWarning(s"Rollover file $rolloverFile already exists, " + + s"rolled over $activeFile to file $altRolloverFile") + FileUtils.moveFile(activeFile, altRolloverFile) + } + } else { + logWarning(s"File $activeFile does not exist") + } + } + } + + /** Retain only last few files */ + private[util] def deleteOldFiles() { + try { + val rolledoverFiles = activeFile.getParentFile.listFiles(new FileFilter { + def accept(f: File): Boolean = { + f.getName.startsWith(activeFile.getName) && f != activeFile + } + }).sorted + val filesToBeDeleted = rolledoverFiles.take( + math.max(0, rolledoverFiles.size - maxRetainedFiles)) + filesToBeDeleted.foreach { file => + logInfo(s"Deleting file executor log file ${file.getAbsolutePath}") + file.delete() + } + } catch { + case e: Exception => + logError("Error cleaning logs in directory " + activeFile.getParentFile.getAbsolutePath, e) + } + } +} + +/** + * Companion object to [[org.apache.spark.util.logging.RollingFileAppender]]. Defines + * names of configurations that configure rolling file appenders. + */ +private[spark] object RollingFileAppender { + val STRATEGY_PROPERTY = "spark.executor.logs.rolling.strategy" + val STRATEGY_DEFAULT = "" + val INTERVAL_PROPERTY = "spark.executor.logs.rolling.time.interval" + val INTERVAL_DEFAULT = "daily" + val SIZE_PROPERTY = "spark.executor.logs.rolling.size.maxBytes" + val SIZE_DEFAULT = (1024 * 1024).toString + val RETAINED_FILES_PROPERTY = "spark.executor.logs.rolling.maxRetainedFiles" + val DEFAULT_BUFFER_SIZE = 8192 + + /** + * Get the sorted list of rolled over files. This assumes that the all the rolled + * over file names are prefixed with the `activeFileName`, and the active file + * name has the latest logs. So it sorts all the rolled over logs (that are + * prefixed with `activeFileName`) and appends the active file + */ + def getSortedRolledOverFiles(directory: String, activeFileName: String): Seq[File] = { + val rolledOverFiles = new File(directory).getAbsoluteFile.listFiles.filter { file => + val fileName = file.getName + fileName.startsWith(activeFileName) && fileName != activeFileName + }.sorted + val activeFile = { + val file = new File(directory, activeFileName).getAbsoluteFile + if (file.exists) Some(file) else None + } + rolledOverFiles ++ activeFile + } +} diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingPolicy.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingPolicy.scala new file mode 100644 index 0000000000..84e5c3c917 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/logging/RollingPolicy.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.logging + +import java.text.SimpleDateFormat +import java.util.Calendar + +import org.apache.spark.Logging + +/** + * Defines the policy based on which [[org.apache.spark.util.logging.RollingFileAppender]] will + * generate rolling files. + */ +private[spark] trait RollingPolicy { + + /** Whether rollover should be initiated at this moment */ + def shouldRollover(bytesToBeWritten: Long): Boolean + + /** Notify that rollover has occurred */ + def rolledOver() + + /** Notify that bytes have been written */ + def bytesWritten(bytes: Long) + + /** Get the desired name of the rollover file */ + def generateRolledOverFileSuffix(): String +} + +/** + * Defines a [[org.apache.spark.util.logging.RollingPolicy]] by which files will be rolled + * over at a fixed interval. + */ +private[spark] class TimeBasedRollingPolicy( + var rolloverIntervalMillis: Long, + rollingFileSuffixPattern: String, + checkIntervalConstraint: Boolean = true // set to false while testing + ) extends RollingPolicy with Logging { + + import TimeBasedRollingPolicy._ + if (checkIntervalConstraint && rolloverIntervalMillis < MINIMUM_INTERVAL_SECONDS * 1000L) { + logWarning(s"Rolling interval [${rolloverIntervalMillis/1000L} seconds] is too small. " + + s"Setting the interval to the acceptable minimum of $MINIMUM_INTERVAL_SECONDS seconds.") + rolloverIntervalMillis = MINIMUM_INTERVAL_SECONDS * 1000L + } + + @volatile private var nextRolloverTime = calculateNextRolloverTime() + private val formatter = new SimpleDateFormat(rollingFileSuffixPattern) + + /** Should rollover if current time has exceeded next rollover time */ + def shouldRollover(bytesToBeWritten: Long): Boolean = { + System.currentTimeMillis > nextRolloverTime + } + + /** Rollover has occurred, so find the next time to rollover */ + def rolledOver() { + nextRolloverTime = calculateNextRolloverTime() + logDebug(s"Current time: ${System.currentTimeMillis}, next rollover time: " + nextRolloverTime) + } + + def bytesWritten(bytes: Long) { } // nothing to do + + private def calculateNextRolloverTime(): Long = { + val now = System.currentTimeMillis() + val targetTime = ( + math.ceil(now.toDouble / rolloverIntervalMillis) * rolloverIntervalMillis + ).toLong + logDebug(s"Next rollover time is $targetTime") + targetTime + } + + def generateRolledOverFileSuffix(): String = { + formatter.format(Calendar.getInstance.getTime) + } +} + +private[spark] object TimeBasedRollingPolicy { + val MINIMUM_INTERVAL_SECONDS = 60L // 1 minute +} + +/** + * Defines a [[org.apache.spark.util.logging.RollingPolicy]] by which files will be rolled + * over after reaching a particular size. + */ +private[spark] class SizeBasedRollingPolicy( + var rolloverSizeBytes: Long, + checkSizeConstraint: Boolean = true // set to false while testing + ) extends RollingPolicy with Logging { + + import SizeBasedRollingPolicy._ + if (checkSizeConstraint && rolloverSizeBytes < MINIMUM_SIZE_BYTES) { + logWarning(s"Rolling size [$rolloverSizeBytes bytes] is too small. " + + s"Setting the size to the acceptable minimum of $MINIMUM_SIZE_BYTES bytes.") + rolloverSizeBytes = MINIMUM_SIZE_BYTES + } + + @volatile private var bytesWrittenSinceRollover = 0L + val formatter = new SimpleDateFormat("--YYYY-MM-dd--HH-mm-ss--SSSS") + + /** Should rollover if the next set of bytes is going to exceed the size limit */ + def shouldRollover(bytesToBeWritten: Long): Boolean = { + logInfo(s"$bytesToBeWritten + $bytesWrittenSinceRollover > $rolloverSizeBytes") + bytesToBeWritten + bytesWrittenSinceRollover > rolloverSizeBytes + } + + /** Rollover has occurred, so reset the counter */ + def rolledOver() { + bytesWrittenSinceRollover = 0 + } + + /** Increment the bytes that have been written in the current file */ + def bytesWritten(bytes: Long) { + bytesWrittenSinceRollover += bytes + } + + /** Get the desired name of the rollover file */ + def generateRolledOverFileSuffix(): String = { + formatter.format(Calendar.getInstance.getTime) + } +} + +private[spark] object SizeBasedRollingPolicy { + val MINIMUM_SIZE_BYTES = RollingFileAppender.DEFAULT_BUFFER_SIZE * 10 +} + diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index bfae32dae0..01ab2d5493 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -28,6 +28,7 @@ import org.scalatest.FunSuite import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, RecoveryState, WorkerInfo} import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner} +import org.apache.spark.SparkConf class JsonProtocolSuite extends FunSuite { @@ -116,7 +117,8 @@ class JsonProtocolSuite extends FunSuite { } def createExecutorRunner(): ExecutorRunner = { new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", - new File("sparkHome"), new File("workDir"), "akka://worker", ExecutorState.RUNNING) + new File("sparkHome"), new File("workDir"), "akka://worker", + new SparkConf, ExecutorState.RUNNING) } def createDriverRunner(): DriverRunner = { new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), createDriverDesc(), diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index 8ae387fa0b..e5f748d555 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -22,6 +22,7 @@ import java.io.File import org.scalatest.FunSuite import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState} +import org.apache.spark.SparkConf class ExecutorRunnerTest extends FunSuite { test("command includes appId") { @@ -32,7 +33,7 @@ class ExecutorRunnerTest extends FunSuite { sparkHome, "appUiUrl") val appId = "12345-worker321-9876" val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")), - f("ooga"), "blah", ExecutorState.RUNNING) + f("ooga"), "blah", new SparkConf, ExecutorState.RUNNING) assert(er.getCommandSeq.last === appId) } diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala new file mode 100644 index 0000000000..53d7f5c607 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io._ + +import scala.collection.mutable.HashSet +import scala.reflect._ + +import org.apache.commons.io.{FileUtils, IOUtils} +import org.apache.spark.{Logging, SparkConf} +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.apache.spark.util.logging.{RollingFileAppender, SizeBasedRollingPolicy, TimeBasedRollingPolicy, FileAppender} + +class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging { + + val testFile = new File("FileAppenderSuite-test-" + System.currentTimeMillis).getAbsoluteFile + + before { + cleanup() + } + + after { + cleanup() + } + + test("basic file appender") { + val testString = (1 to 1000).mkString(", ") + val inputStream = IOUtils.toInputStream(testString) + val appender = new FileAppender(inputStream, testFile) + inputStream.close() + appender.awaitTermination() + assert(FileUtils.readFileToString(testFile) === testString) + } + + test("rolling file appender - time-based rolling") { + // setup input stream and appender + val testOutputStream = new PipedOutputStream() + val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000) + val rolloverIntervalMillis = 100 + val durationMillis = 1000 + val numRollovers = durationMillis / rolloverIntervalMillis + val textToAppend = (1 to numRollovers).map( _.toString * 10 ) + + val appender = new RollingFileAppender(testInputStream, testFile, + new TimeBasedRollingPolicy(rolloverIntervalMillis, s"--HH-mm-ss-SSSS", false), + new SparkConf(), 10) + + testRolling(appender, testOutputStream, textToAppend, rolloverIntervalMillis) + } + + test("rolling file appender - size-based rolling") { + // setup input stream and appender + val testOutputStream = new PipedOutputStream() + val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000) + val rolloverSize = 1000 + val textToAppend = (1 to 3).map( _.toString * 1000 ) + + val appender = new RollingFileAppender(testInputStream, testFile, + new SizeBasedRollingPolicy(rolloverSize, false), new SparkConf(), 99) + + val files = testRolling(appender, testOutputStream, textToAppend, 0) + files.foreach { file => + logInfo(file.toString + ": " + file.length + " bytes") + assert(file.length <= rolloverSize) + } + } + + test("rolling file appender - cleaning") { + // setup input stream and appender + val testOutputStream = new PipedOutputStream() + val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000) + val conf = new SparkConf().set(RollingFileAppender.RETAINED_FILES_PROPERTY, "10") + val appender = new RollingFileAppender(testInputStream, testFile, + new SizeBasedRollingPolicy(1000, false), conf, 10) + + // send data to appender through the input stream, and wait for the data to be written + val allGeneratedFiles = new HashSet[String]() + val items = (1 to 10).map { _.toString * 10000 } + for (i <- 0 until items.size) { + testOutputStream.write(items(i).getBytes("UTF8")) + testOutputStream.flush() + allGeneratedFiles ++= RollingFileAppender.getSortedRolledOverFiles( + testFile.getParentFile.toString, testFile.getName).map(_.toString) + + Thread.sleep(10) + } + testOutputStream.close() + appender.awaitTermination() + logInfo("Appender closed") + + // verify whether the earliest file has been deleted + val rolledOverFiles = allGeneratedFiles.filter { _ != testFile.toString }.toArray.sorted + logInfo(s"All rolled over files generated:${rolledOverFiles.size}\n" + rolledOverFiles.mkString("\n")) + assert(rolledOverFiles.size > 2) + val earliestRolledOverFile = rolledOverFiles.head + val existingRolledOverFiles = RollingFileAppender.getSortedRolledOverFiles( + testFile.getParentFile.toString, testFile.getName).map(_.toString) + logInfo("Existing rolled over files:\n" + existingRolledOverFiles.mkString("\n")) + assert(!existingRolledOverFiles.toSet.contains(earliestRolledOverFile)) + } + + test("file appender selection") { + // Test whether FileAppender.apply() returns the right type of the FileAppender based + // on SparkConf settings. + + def testAppenderSelection[ExpectedAppender: ClassTag, ExpectedRollingPolicy]( + properties: Seq[(String, String)], expectedRollingPolicyParam: Long = -1): FileAppender = { + + // Set spark conf properties + val conf = new SparkConf + properties.foreach { p => + conf.set(p._1, p._2) + } + + // Create and test file appender + val inputStream = new PipedInputStream(new PipedOutputStream()) + val appender = FileAppender(inputStream, new File("stdout"), conf) + assert(appender.isInstanceOf[ExpectedAppender]) + assert(appender.getClass.getSimpleName === + classTag[ExpectedAppender].runtimeClass.getSimpleName) + if (appender.isInstanceOf[RollingFileAppender]) { + val rollingPolicy = appender.asInstanceOf[RollingFileAppender].rollingPolicy + rollingPolicy.isInstanceOf[ExpectedRollingPolicy] + val policyParam = if (rollingPolicy.isInstanceOf[TimeBasedRollingPolicy]) { + rollingPolicy.asInstanceOf[TimeBasedRollingPolicy].rolloverIntervalMillis + } else { + rollingPolicy.asInstanceOf[SizeBasedRollingPolicy].rolloverSizeBytes + } + assert(policyParam === expectedRollingPolicyParam) + } + appender + } + + import RollingFileAppender._ + + def rollingStrategy(strategy: String) = Seq(STRATEGY_PROPERTY -> strategy) + def rollingSize(size: String) = Seq(SIZE_PROPERTY -> size) + def rollingInterval(interval: String) = Seq(INTERVAL_PROPERTY -> interval) + + val msInDay = 24 * 60 * 60 * 1000L + val msInHour = 60 * 60 * 1000L + val msInMinute = 60 * 1000L + + // test no strategy -> no rolling + testAppenderSelection[FileAppender, Any](Seq.empty) + + // test time based rolling strategy + testAppenderSelection[RollingFileAppender, Any](rollingStrategy("time"), msInDay) + testAppenderSelection[RollingFileAppender, TimeBasedRollingPolicy]( + rollingStrategy("time") ++ rollingInterval("daily"), msInDay) + testAppenderSelection[RollingFileAppender, TimeBasedRollingPolicy]( + rollingStrategy("time") ++ rollingInterval("hourly"), msInHour) + testAppenderSelection[RollingFileAppender, TimeBasedRollingPolicy]( + rollingStrategy("time") ++ rollingInterval("minutely"), msInMinute) + testAppenderSelection[RollingFileAppender, TimeBasedRollingPolicy]( + rollingStrategy("time") ++ rollingInterval("123456789"), 123456789 * 1000L) + testAppenderSelection[FileAppender, Any]( + rollingStrategy("time") ++ rollingInterval("xyz")) + + // test size based rolling strategy + testAppenderSelection[RollingFileAppender, SizeBasedRollingPolicy]( + rollingStrategy("size") ++ rollingSize("123456789"), 123456789) + testAppenderSelection[FileAppender, Any](rollingSize("xyz")) + + // test illegal strategy + testAppenderSelection[FileAppender, Any](rollingStrategy("xyz")) + } + + /** + * Run the rolling file appender with data and see whether all the data was written correctly + * across rolled over files. + */ + def testRolling( + appender: FileAppender, + outputStream: OutputStream, + textToAppend: Seq[String], + sleepTimeBetweenTexts: Long + ): Seq[File] = { + // send data to appender through the input stream, and wait for the data to be written + val expectedText = textToAppend.mkString("") + for (i <- 0 until textToAppend.size) { + outputStream.write(textToAppend(i).getBytes("UTF8")) + outputStream.flush() + Thread.sleep(sleepTimeBetweenTexts) + } + logInfo("Data sent to appender") + outputStream.close() + appender.awaitTermination() + logInfo("Appender closed") + + // verify whether all the data written to rolled over files is same as expected + val generatedFiles = RollingFileAppender.getSortedRolledOverFiles( + testFile.getParentFile.toString, testFile.getName) + logInfo("Filtered files: \n" + generatedFiles.mkString("\n")) + assert(generatedFiles.size > 1) + val allText = generatedFiles.map { file => + FileUtils.readFileToString(file) + }.mkString("") + assert(allText === expectedText) + generatedFiles + } + + /** Delete all the generated rolledover files */ + def cleanup() { + testFile.getParentFile.listFiles.filter { file => + file.getName.startsWith(testFile.getName) + }.foreach { _.delete() } + } +} diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 0aad882ed7..1ee936bc78 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -140,6 +140,38 @@ class UtilsSuite extends FunSuite { Utils.deleteRecursively(tmpDir2) } + test("reading offset bytes across multiple files") { + val tmpDir = Files.createTempDir() + tmpDir.deleteOnExit() + val files = (1 to 3).map(i => new File(tmpDir, i.toString)) + Files.write("0123456789", files(0), Charsets.UTF_8) + Files.write("abcdefghij", files(1), Charsets.UTF_8) + Files.write("ABCDEFGHIJ", files(2), Charsets.UTF_8) + + // Read first few bytes in the 1st file + assert(Utils.offsetBytes(files, 0, 5) === "01234") + + // Read bytes within the 1st file + assert(Utils.offsetBytes(files, 5, 8) === "567") + + // Read bytes across 1st and 2nd file + assert(Utils.offsetBytes(files, 8, 18) === "89abcdefgh") + + // Read bytes across 1st, 2nd and 3rd file + assert(Utils.offsetBytes(files, 5, 24) === "56789abcdefghijABCD") + + // Read some nonexistent bytes in the beginning + assert(Utils.offsetBytes(files, -5, 18) === "0123456789abcdefgh") + + // Read some nonexistent bytes at the end + assert(Utils.offsetBytes(files, 18, 35) === "ijABCDEFGHIJ") + + // Read some nonexistent bytes on both ends + assert(Utils.offsetBytes(files, -5, 35) === "0123456789abcdefghijABCDEFGHIJ") + + Utils.deleteRecursively(tmpDir) + } + test("deserialize long value") { val testval : Long = 9730889947L val bbuf = ByteBuffer.allocate(8) diff --git a/docs/configuration.md b/docs/configuration.md index 71fafa5734..b84104cc7e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -784,6 +784,45 @@ Apart from these, the following properties are also available, and may be useful higher memory usage in Spark. </td> </tr> +<tr> + <td><code>spark.executor.logs.rolling.strategy</code></td> + <td>(none)</td> + <td> + Set the strategy of rolling of executor logs. By default it is disabled. It can + be set to "time" (time-based rolling) or "size" (size-based rolling). For "time", + use <code>spark.executor.logs.rolling.time.interval</code> to set the rolling interval. + For "size", use <code>spark.executor.logs.rolling.size.maxBytes</code> to set + the maximum file size for rolling. + </td> +</tr> +<tr> + <td><code>spark.executor.logs.rolling.time.interval</code></td> + <td>daily</td> + <td> + Set the time interval by which the executor logs will be rolled over. + Rolling is disabled by default. Valid values are `daily`, `hourly`, `minutely` or + any interval in seconds. See <code>spark.executor.logs.rolling.maxRetainedFiles</code> + for automatic cleaning of old logs. + </td> +</tr> +<tr> + <td><code>spark.executor.logs.rolling.size.maxBytes</code></td> + <td>(none)</td> + <td> + Set the max size of the file by which the executor logs will be rolled over. + Rolling is disabled by default. Value is set in terms of bytes. + See <code>spark.executor.logs.rolling.maxRetainedFiles</code> + for automatic cleaning of old logs. + </td> +</tr> +<tr> + <td><code>spark.executor.logs.rolling.maxRetainedFiles</code></td> + <td>(none)</td> + <td> + Sets the number of latest rolling log files that are going to be retained by the system. + Older log files will be deleted. Disabled by default. + </td> +</tr> </table> #### Cluster Managers |