aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-05-01 21:42:06 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-05-01 21:42:06 -0700
commit394d8cb1c4dfd1e496562009e716b8fc06be22cd (patch)
tree84bd990a34d195d94ceb83762bd91a65b612b8fa
parent40cf6d31019c5402e5eb08158856242d20697ba4 (diff)
downloadspark-394d8cb1c4dfd1e496562009e716b8fc06be22cd.tar.gz
spark-394d8cb1c4dfd1e496562009e716b8fc06be22cd.tar.bz2
spark-394d8cb1c4dfd1e496562009e716b8fc06be22cd.zip
Add tests for FileLogger, EventLoggingListener, and ReplayListenerBus
Modifications to Spark core are limited to exposing functionality to test files + minor style fixes. (728 / 769 lines are from tests) Author: Andrew Or <andrewor14@gmail.com> Closes #591 from andrewor14/event-log-tests and squashes the following commits: 2883837 [Andrew Or] Merge branch 'master' of github.com:apache/spark into event-log-tests c3afcea [Andrew Or] Compromise 2d5daf8 [Andrew Or] Use temp directory provided by the OS rather than /tmp 2b52151 [Andrew Or] Remove unnecessary file delete + add a comment 62010fd [Andrew Or] More cleanup (renaming variables, updating comments etc) ad2beff [Andrew Or] Clean up EventLoggingListenerSuite + modify a few comments 862e752 [Andrew Or] Merge branch 'master' of github.com:apache/spark into event-log-tests e0ba2f8 [Andrew Or] Fix test failures caused by race condition in processing/mutating events b990453 [Andrew Or] ReplayListenerBus suite - tests do not all pass yet ab66a84 [Andrew Or] Tests for FileLogger + delete file after tests 187bb25 [Andrew Or] Formatting and renaming variables 769336f [Andrew Or] Merge branch 'master' of github.com:apache/spark into event-log-tests 5d38ffe [Andrew Or] Clean up EventLoggingListenerSuite + add comments e12f4b1 [Andrew Or] Preliminary tests for EventLoggingListener (need major cleanup)
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala40
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/FileLogger.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala18
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala400
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala166
-rw-r--r--core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala163
8 files changed, 791 insertions, 36 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index d822a8e551..7968a0691d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -18,13 +18,16 @@
package org.apache.spark.scheduler
import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.permission.FsPermission
+import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods._
import org.apache.spark.{Logging, SparkConf, SparkContext}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{FileLogger, JsonProtocol}
@@ -40,31 +43,36 @@ import org.apache.spark.util.{FileLogger, JsonProtocol}
*/
private[spark] class EventLoggingListener(
appName: String,
- conf: SparkConf,
- hadoopConfiguration: Configuration)
+ sparkConf: SparkConf,
+ hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration())
extends SparkListener with Logging {
import EventLoggingListener._
- private val shouldCompress = conf.getBoolean("spark.eventLog.compress", false)
- private val shouldOverwrite = conf.getBoolean("spark.eventLog.overwrite", false)
- private val outputBufferSize = conf.getInt("spark.eventLog.buffer.kb", 100) * 1024
- private val logBaseDir = conf.get("spark.eventLog.dir", "/tmp/spark-events").stripSuffix("/")
+ private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
+ private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
+ private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
+ private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
+ private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis
val logDir = logBaseDir + "/" + name
- private val logger =
- new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress,
- shouldOverwrite, Some(LOG_FILE_PERMISSIONS))
+ protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize,
+ shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS))
+
+ // For testing. Keep track of all JSON serialized events that have been logged.
+ private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
/**
* Begin logging events.
* If compression is used, log a file that indicates which compression library is used.
*/
def start() {
+ logger.start()
logInfo("Logging events to %s".format(logDir))
if (shouldCompress) {
- val codec = conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
+ val codec =
+ sparkConf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
logger.newFile(COMPRESSION_CODEC_PREFIX + codec)
}
logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION)
@@ -73,11 +81,14 @@ private[spark] class EventLoggingListener(
/** Log the event as JSON. */
private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
- val eventJson = compact(render(JsonProtocol.sparkEventToJson(event)))
- logger.logLine(eventJson)
+ val eventJson = JsonProtocol.sparkEventToJson(event)
+ logger.logLine(compact(render(eventJson)))
if (flushLogger) {
logger.flush()
}
+ if (testing) {
+ loggedEvents += eventJson
+ }
}
// Events that do not trigger a flush
@@ -121,13 +132,12 @@ private[spark] class EventLoggingListener(
}
private[spark] object EventLoggingListener extends Logging {
+ val DEFAULT_LOG_DIR = "/tmp/spark-events"
val LOG_PREFIX = "EVENT_LOG_"
val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
- val LOG_FILE_PERMISSIONS: FsPermission =
- FsPermission.createImmutable(Integer.parseInt("770", 8).toShort)
-
+ val LOG_FILE_PERMISSIONS = FsPermission.createImmutable(Integer.parseInt("770", 8).toShort)
// A cache for compression codecs to avoid creating the same codec many times
private val codecMap = new mutable.HashMap[String, CompressionCodec]
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index d6df193d9b..0286aac876 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -37,7 +37,7 @@ private[spark] trait SparkListenerBus {
* Post an event to all attached listeners. This does nothing if the event is
* SparkListenerShutdown.
*/
- protected def postToAll(event: SparkListenerEvent) {
+ def postToAll(event: SparkListenerEvent) {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
index 0965e0f0f7..0e6d21b220 100644
--- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala
+++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
@@ -17,7 +17,7 @@
package org.apache.spark.util
-import java.io.{FileOutputStream, BufferedOutputStream, PrintWriter, IOException}
+import java.io.{BufferedOutputStream, FileOutputStream, IOException, PrintWriter}
import java.net.URI
import java.text.SimpleDateFormat
import java.util.Date
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
/**
@@ -39,8 +40,8 @@ import org.apache.spark.io.CompressionCodec
*/
private[spark] class FileLogger(
logDir: String,
- conf: SparkConf,
- hadoopConfiguration: Configuration,
+ sparkConf: SparkConf,
+ hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration(),
outputBufferSize: Int = 8 * 1024, // 8 KB
compress: Boolean = false,
overwrite: Boolean = true,
@@ -55,14 +56,19 @@ private[spark] class FileLogger(
var fileIndex = 0
// Only used if compression is enabled
- private lazy val compressionCodec = CompressionCodec.createCodec(conf)
+ private lazy val compressionCodec = CompressionCodec.createCodec(sparkConf)
// Only defined if the file system scheme is not local
private var hadoopDataStream: Option[FSDataOutputStream] = None
private var writer: Option[PrintWriter] = None
- createLogDir()
+ /**
+ * Start this logger by creating the logging directory.
+ */
+ def start() {
+ createLogDir()
+ }
/**
* Create a logging directory with the given path.
@@ -83,7 +89,7 @@ private[spark] class FileLogger(
}
if (dirPermissions.isDefined) {
val fsStatus = fileSystem.getFileStatus(path)
- if (fsStatus.getPermission().toShort() != dirPermissions.get.toShort) {
+ if (fsStatus.getPermission.toShort != dirPermissions.get.toShort) {
fileSystem.setPermission(path, dirPermissions.get)
}
}
@@ -92,14 +98,14 @@ private[spark] class FileLogger(
/**
* Create a new writer for the file identified by the given path.
* If the permissions are not passed in, it will default to use the permissions
- * (dirpermissions) used when class was instantiated.
+ * (dirPermissions) used when class was instantiated.
*/
private def createWriter(fileName: String, perms: Option[FsPermission] = None): PrintWriter = {
val logPath = logDir + "/" + fileName
val uri = new URI(logPath)
- val defaultFs = FileSystem.getDefaultUri(hadoopConfiguration).getScheme
- val isDefaultLocal = (defaultFs == null || defaultFs == "file")
val path = new Path(logPath)
+ val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
+ val isDefaultLocal = defaultFs == null || defaultFs == "file"
/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
* Therefore, for local files, use FileOutputStream instead. */
@@ -112,7 +118,7 @@ private[spark] class FileLogger(
hadoopDataStream.get
}
- perms.orElse(dirPermissions).foreach {p => fileSystem.setPermission(path, p)}
+ perms.orElse(dirPermissions).foreach { p => fileSystem.setPermission(path, p) }
val bstream = new BufferedOutputStream(dstream, outputBufferSize)
val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream
new PrintWriter(cstream)
@@ -127,7 +133,7 @@ private[spark] class FileLogger(
val writeInfo = if (!withTime) {
msg
} else {
- val date = new Date(System.currentTimeMillis())
+ val date = new Date(System.currentTimeMillis)
dateFormat.get.format(date) + ": " + msg
}
writer.foreach(_.print(writeInfo))
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 9aed3e0985..09825087bb 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -646,11 +646,11 @@ private[spark] object JsonProtocol {
}
def propertiesFromJson(json: JValue): Properties = {
- val properties = new Properties()
- if (json != JNothing) {
- mapFromJson(json).map { case (k, v) => properties.setProperty(k, v) }
- }
- properties
+ Utils.jsonOption(json).map { value =>
+ val properties = new Properties
+ mapFromJson(json).foreach { case (k, v) => properties.setProperty(k, v) }
+ properties
+ }.getOrElse(null)
}
def UUIDFromJson(json: JValue): UUID = {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 2c934a4bac..536a740140 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1062,15 +1062,25 @@ private[spark] object Utils extends Logging {
}
/**
- * return true if this is Windows.
+ * Return the absolute path of a file in the given directory.
*/
- def isWindows = Option(System.getProperty("os.name")).
- map(_.startsWith("Windows")).getOrElse(false)
+ def getFilePath(dir: File, fileName: String): Path = {
+ assert(dir.isDirectory)
+ val path = new File(dir, fileName).getAbsolutePath
+ new Path(path)
+ }
+
+ /**
+ * Return true if this is Windows.
+ */
+ def isWindows = {
+ Option(System.getProperty("os.name")).exists(_.startsWith("Windows"))
+ }
/**
* Indicates whether Spark is currently running unit tests.
*/
- private[spark] def isTesting = {
+ def isTesting = {
sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
new file mode 100644
index 0000000000..95f5bcd855
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+import scala.io.Source
+import scala.util.Try
+
+import com.google.common.io.Files
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.json4s.jackson.JsonMethods._
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.{JsonProtocol, Utils}
+
+/**
+ * Test whether EventLoggingListener logs events properly.
+ *
+ * This tests whether EventLoggingListener actually creates special files while logging events,
+ * whether the parsing of these special files is correct, and whether the logged events can be
+ * read and deserialized into actual SparkListenerEvents.
+ */
+class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
+ private val fileSystem = Utils.getHadoopFileSystem("/")
+ private val allCompressionCodecs = Seq[String](
+ "org.apache.spark.io.LZFCompressionCodec",
+ "org.apache.spark.io.SnappyCompressionCodec"
+ )
+ private val testDir = Files.createTempDir()
+ private val logDirPath = Utils.getFilePath(testDir, "spark-events")
+
+ after {
+ Try { fileSystem.delete(logDirPath, true) }
+ }
+
+ test("Parse names of special files") {
+ testParsingFileName()
+ }
+
+ test("Verify special files exist") {
+ testSpecialFilesExist()
+ }
+
+ test("Verify special files exist with compression") {
+ allCompressionCodecs.foreach { codec =>
+ testSpecialFilesExist(compressionCodec = Some(codec))
+ }
+ }
+
+ test("Parse event logging info") {
+ testParsingLogInfo()
+ }
+
+ test("Parse event logging info with compression") {
+ allCompressionCodecs.foreach { codec =>
+ testParsingLogInfo(compressionCodec = Some(codec))
+ }
+ }
+
+ test("Basic event logging") {
+ testEventLogging()
+ }
+
+ test("Basic event logging with compression") {
+ allCompressionCodecs.foreach { codec =>
+ testEventLogging(compressionCodec = Some(codec))
+ }
+ }
+
+ test("End-to-end event logging") {
+ testApplicationEventLogging()
+ }
+
+ test("End-to-end event logging with compression") {
+ allCompressionCodecs.foreach { codec =>
+ testApplicationEventLogging(compressionCodec = Some(codec))
+ }
+ }
+
+
+ /* ----------------- *
+ * Actual test logic *
+ * ----------------- */
+
+ import EventLoggingListenerSuite._
+
+ /**
+ * Test whether names of special files are correctly identified and parsed.
+ */
+ private def testParsingFileName() {
+ val logPrefix = EventLoggingListener.LOG_PREFIX
+ val sparkVersionPrefix = EventLoggingListener.SPARK_VERSION_PREFIX
+ val compressionCodecPrefix = EventLoggingListener.COMPRESSION_CODEC_PREFIX
+ val applicationComplete = EventLoggingListener.APPLICATION_COMPLETE
+ assert(EventLoggingListener.isEventLogFile(logPrefix + "0"))
+ assert(EventLoggingListener.isEventLogFile(logPrefix + "100"))
+ assert(EventLoggingListener.isEventLogFile(logPrefix + "ANYTHING"))
+ assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "0.9.1"))
+ assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "1.0.0"))
+ assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "ANYTHING"))
+ assert(EventLoggingListener.isApplicationCompleteFile(applicationComplete))
+ allCompressionCodecs.foreach { codec =>
+ assert(EventLoggingListener.isCompressionCodecFile(compressionCodecPrefix + codec))
+ }
+
+ // Negatives
+ assert(!EventLoggingListener.isEventLogFile("The greatest man of all mankind"))
+ assert(!EventLoggingListener.isSparkVersionFile("Will never falter in the face of death!"))
+ assert(!EventLoggingListener.isCompressionCodecFile("Unless he chooses to leave behind"))
+ assert(!EventLoggingListener.isApplicationCompleteFile("The very treasure he calls Macbeth"))
+
+ // Verify that parsing is correct
+ assert(EventLoggingListener.parseSparkVersion(sparkVersionPrefix + "1.0.0") === "1.0.0")
+ allCompressionCodecs.foreach { codec =>
+ assert(EventLoggingListener.parseCompressionCodec(compressionCodecPrefix + codec) === codec)
+ }
+ }
+
+ /**
+ * Test whether the special files produced by EventLoggingListener exist.
+ *
+ * There should be exactly one event log and one spark version file throughout the entire
+ * execution. If a compression codec is specified, then the compression codec file should
+ * also exist. Only after the application has completed does the test expect the application
+ * completed file to be present.
+ */
+ private def testSpecialFilesExist(compressionCodec: Option[String] = None) {
+
+ def assertFilesExist(logFiles: Array[FileStatus], loggerStopped: Boolean) {
+ val numCompressionCodecFiles = if (compressionCodec.isDefined) 1 else 0
+ val numApplicationCompleteFiles = if (loggerStopped) 1 else 0
+ assert(logFiles.size === 2 + numCompressionCodecFiles + numApplicationCompleteFiles)
+ assert(eventLogsExist(logFiles))
+ assert(sparkVersionExists(logFiles))
+ assert(compressionCodecExists(logFiles) === compressionCodec.isDefined)
+ assert(applicationCompleteExists(logFiles) === loggerStopped)
+ assertSparkVersionIsValid(logFiles)
+ compressionCodec.foreach { codec =>
+ assertCompressionCodecIsValid(logFiles, codec)
+ }
+ }
+
+ // Verify logging directory exists
+ val conf = getLoggingConf(logDirPath, compressionCodec)
+ val eventLogger = new EventLoggingListener("test", conf)
+ eventLogger.start()
+ val logPath = new Path(eventLogger.logDir)
+ assert(fileSystem.exists(logPath))
+ val logDir = fileSystem.getFileStatus(logPath)
+ assert(logDir.isDir)
+
+ // Verify special files are as expected before stop()
+ var logFiles = fileSystem.listStatus(logPath)
+ assert(logFiles != null)
+ assertFilesExist(logFiles, loggerStopped = false)
+
+ // Verify special files are as expected after stop()
+ eventLogger.stop()
+ logFiles = fileSystem.listStatus(logPath)
+ assertFilesExist(logFiles, loggerStopped = true)
+ }
+
+ /**
+ * Test whether EventLoggingListener correctly parses the correct information from the logs.
+ *
+ * This includes whether it returns the correct Spark version, compression codec (if any),
+ * and the application's completion status.
+ */
+ private def testParsingLogInfo(compressionCodec: Option[String] = None) {
+
+ def assertInfoCorrect(info: EventLoggingInfo, loggerStopped: Boolean) {
+ assert(info.logPaths.size > 0)
+ assert(info.sparkVersion === SparkContext.SPARK_VERSION)
+ assert(info.compressionCodec.isDefined === compressionCodec.isDefined)
+ info.compressionCodec.foreach { codec =>
+ assert(compressionCodec.isDefined)
+ val expectedCodec = compressionCodec.get.split('.').last
+ assert(codec.getClass.getSimpleName === expectedCodec)
+ }
+ assert(info.applicationComplete === loggerStopped)
+ }
+
+ // Verify that all information is correctly parsed before stop()
+ val conf = getLoggingConf(logDirPath, compressionCodec)
+ val eventLogger = new EventLoggingListener("test", conf)
+ eventLogger.start()
+ var eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
+ assertInfoCorrect(eventLoggingInfo, loggerStopped = false)
+
+ // Verify that all information is correctly parsed after stop()
+ eventLogger.stop()
+ eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
+ assertInfoCorrect(eventLoggingInfo, loggerStopped = true)
+ }
+
+ /**
+ * Test basic event logging functionality.
+ *
+ * This creates two simple events, posts them to the EventLoggingListener, and verifies that
+ * exactly these two events are logged in the expected file.
+ */
+ private def testEventLogging(compressionCodec: Option[String] = None) {
+ val conf = getLoggingConf(logDirPath, compressionCodec)
+ val eventLogger = new EventLoggingListener("test", conf)
+ val listenerBus = new LiveListenerBus
+ val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey")
+ val applicationEnd = SparkListenerApplicationEnd(1000L)
+
+ // A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
+ eventLogger.start()
+ listenerBus.start()
+ listenerBus.addListener(eventLogger)
+ listenerBus.postToAll(applicationStart)
+ listenerBus.postToAll(applicationEnd)
+
+ // Verify file contains exactly the two events logged
+ val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
+ assert(eventLoggingInfo.logPaths.size > 0)
+ val lines = readFileLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec)
+ assert(lines.size === 2)
+ assert(lines(0).contains("SparkListenerApplicationStart"))
+ assert(lines(1).contains("SparkListenerApplicationEnd"))
+ assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === applicationStart)
+ assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationEnd)
+ eventLogger.stop()
+ }
+
+ /**
+ * Test end-to-end event logging functionality in an application.
+ * This runs a simple Spark job and asserts that the expected events are logged when expected.
+ */
+ private def testApplicationEventLogging(compressionCodec: Option[String] = None) {
+ val conf = getLoggingConf(logDirPath, compressionCodec)
+ val sc = new SparkContext("local", "test", conf)
+ assert(sc.eventLogger.isDefined)
+ val eventLogger = sc.eventLogger.get
+ val expectedLogDir = logDirPath.toString
+ assert(eventLogger.logDir.startsWith(expectedLogDir))
+
+ // Begin listening for events that trigger asserts
+ val eventExistenceListener = new EventExistenceListener(eventLogger)
+ sc.addSparkListener(eventExistenceListener)
+
+ // Trigger asserts for whether the expected events are actually logged
+ sc.parallelize(1 to 10000).count()
+ sc.stop()
+
+ // Ensure all asserts have actually been triggered
+ eventExistenceListener.assertAllCallbacksInvoked()
+ }
+
+ /**
+ * Assert that all of the specified events are logged by the given EventLoggingListener.
+ */
+ private def assertEventsExist(eventLogger: EventLoggingListener, events: Seq[String]) {
+ val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
+ assert(eventLoggingInfo.logPaths.size > 0)
+ val lines = readFileLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec)
+ val eventSet = mutable.Set(events: _*)
+ lines.foreach { line =>
+ eventSet.foreach { event =>
+ if (line.contains(event)) {
+ val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line))
+ val eventType = Utils.getFormattedClassName(parsedEvent)
+ if (eventType == event) {
+ eventSet.remove(event)
+ }
+ }
+ }
+ }
+ assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
+ }
+
+ /**
+ * Read all lines from the file specified by the given path.
+ * If a compression codec is specified, use it to read the file.
+ */
+ private def readFileLines(
+ filePath: Path,
+ compressionCodec: Option[CompressionCodec]): Seq[String] = {
+ val fstream = fileSystem.open(filePath)
+ val cstream =
+ compressionCodec.map { codec =>
+ codec.compressedInputStream(fstream)
+ }.getOrElse(fstream)
+ Source.fromInputStream(cstream).getLines().toSeq
+ }
+
+ /**
+ * A listener that asserts certain events are logged by the given EventLoggingListener.
+ * This is necessary because events are posted asynchronously in a different thread.
+ */
+ private class EventExistenceListener(eventLogger: EventLoggingListener) extends SparkListener {
+ var jobStarted = false
+ var jobEnded = false
+ var appEnded = false
+
+ override def onJobStart(jobStart: SparkListenerJobStart) {
+ assertEventsExist(eventLogger, Seq[String](
+ Utils.getFormattedClassName(SparkListenerApplicationStart),
+ Utils.getFormattedClassName(SparkListenerBlockManagerAdded),
+ Utils.getFormattedClassName(SparkListenerEnvironmentUpdate)
+ ))
+ jobStarted = true
+ }
+
+ override def onJobEnd(jobEnd: SparkListenerJobEnd) {
+ assertEventsExist(eventLogger, Seq[String](
+ Utils.getFormattedClassName(SparkListenerJobStart),
+ Utils.getFormattedClassName(SparkListenerJobEnd),
+ Utils.getFormattedClassName(SparkListenerStageSubmitted),
+ Utils.getFormattedClassName(SparkListenerStageCompleted),
+ Utils.getFormattedClassName(SparkListenerTaskStart),
+ Utils.getFormattedClassName(SparkListenerTaskEnd)
+ ))
+ jobEnded = true
+ }
+
+ override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
+ assertEventsExist(eventLogger, Seq[String](
+ Utils.getFormattedClassName(SparkListenerApplicationEnd)
+ ))
+ appEnded = true
+ }
+
+ def assertAllCallbacksInvoked() {
+ assert(jobStarted, "JobStart callback not invoked!")
+ assert(jobEnded, "JobEnd callback not invoked!")
+ assert(appEnded, "ApplicationEnd callback not invoked!")
+ }
+ }
+
+
+ /* -------------------------------------------------------- *
+ * Helper methods for validating state of the special files *
+ * -------------------------------------------------------- */
+
+ private def eventLogsExist(logFiles: Array[FileStatus]): Boolean = {
+ logFiles.map(_.getPath.getName).exists(EventLoggingListener.isEventLogFile)
+ }
+
+ private def sparkVersionExists(logFiles: Array[FileStatus]): Boolean = {
+ logFiles.map(_.getPath.getName).exists(EventLoggingListener.isSparkVersionFile)
+ }
+
+ private def compressionCodecExists(logFiles: Array[FileStatus]): Boolean = {
+ logFiles.map(_.getPath.getName).exists(EventLoggingListener.isCompressionCodecFile)
+ }
+
+ private def applicationCompleteExists(logFiles: Array[FileStatus]): Boolean = {
+ logFiles.map(_.getPath.getName).exists(EventLoggingListener.isApplicationCompleteFile)
+ }
+
+ private def assertSparkVersionIsValid(logFiles: Array[FileStatus]) {
+ val file = logFiles.map(_.getPath.getName).find(EventLoggingListener.isSparkVersionFile)
+ assert(file.isDefined)
+ assert(EventLoggingListener.parseSparkVersion(file.get) === SparkContext.SPARK_VERSION)
+ }
+
+ private def assertCompressionCodecIsValid(logFiles: Array[FileStatus], compressionCodec: String) {
+ val file = logFiles.map(_.getPath.getName).find(EventLoggingListener.isCompressionCodecFile)
+ assert(file.isDefined)
+ assert(EventLoggingListener.parseCompressionCodec(file.get) === compressionCodec)
+ }
+
+}
+
+
+object EventLoggingListenerSuite {
+
+ /** Get a SparkConf with event logging enabled. */
+ def getLoggingConf(logDir: Path, compressionCodec: Option[String] = None) = {
+ val conf = new SparkConf
+ conf.set("spark.eventLog.enabled", "true")
+ conf.set("spark.eventLog.testing", "true")
+ conf.set("spark.eventLog.dir", logDir.toString)
+ compressionCodec.foreach { codec =>
+ conf.set("spark.eventLog.compress", "true")
+ conf.set("spark.io.compression.codec", codec)
+ }
+ conf
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
new file mode 100644
index 0000000000..d1fe1fc348
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io.PrintWriter
+
+import scala.util.Try
+
+import com.google.common.io.Files
+import org.json4s.jackson.JsonMethods._
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.{JsonProtocol, Utils}
+
+/**
+ * Test whether ReplayListenerBus replays events from logs correctly.
+ */
+class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
+ private val fileSystem = Utils.getHadoopFileSystem("/")
+ private val allCompressionCodecs = Seq[String](
+ "org.apache.spark.io.LZFCompressionCodec",
+ "org.apache.spark.io.SnappyCompressionCodec"
+ )
+ private val testDir = Files.createTempDir()
+
+ after {
+ Try { fileSystem.delete(Utils.getFilePath(testDir, "events.txt"), true) }
+ Try { fileSystem.delete(Utils.getFilePath(testDir, "test-replay"), true) }
+ }
+
+ test("Simple replay") {
+ testSimpleReplay()
+ }
+
+ test("Simple replay with compression") {
+ allCompressionCodecs.foreach { codec =>
+ testSimpleReplay(Some(codec))
+ }
+ }
+
+ // This assumes the correctness of EventLoggingListener
+ test("End-to-end replay") {
+ testApplicationReplay()
+ }
+
+ // This assumes the correctness of EventLoggingListener
+ test("End-to-end replay with compression") {
+ allCompressionCodecs.foreach { codec =>
+ testApplicationReplay(Some(codec))
+ }
+ }
+
+
+ /* ----------------- *
+ * Actual test logic *
+ * ----------------- */
+
+ /**
+ * Test simple replaying of events.
+ */
+ private def testSimpleReplay(codecName: Option[String] = None) {
+ val logFilePath = Utils.getFilePath(testDir, "events.txt")
+ val codec = codecName.map(getCompressionCodec)
+ val fstream = fileSystem.create(logFilePath)
+ val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
+ val writer = new PrintWriter(cstream)
+ val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey")
+ val applicationEnd = SparkListenerApplicationEnd(1000L)
+ writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
+ writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
+ writer.close()
+ val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec)
+ val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath, codecName)
+ val eventMonster = new EventMonster(conf)
+ replayer.addListener(eventMonster)
+ replayer.replay()
+ assert(eventMonster.loggedEvents.size === 2)
+ assert(eventMonster.loggedEvents(0) === JsonProtocol.sparkEventToJson(applicationStart))
+ assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd))
+ }
+
+ /**
+ * Test end-to-end replaying of events.
+ *
+ * This test runs a few simple jobs with event logging enabled, and compares each emitted
+ * event to the corresponding event replayed from the event logs. This test makes the
+ * assumption that the event logging behavior is correct (tested in a separate suite).
+ */
+ private def testApplicationReplay(codecName: Option[String] = None) {
+ val logDirPath = Utils.getFilePath(testDir, "test-replay")
+ val conf = EventLoggingListenerSuite.getLoggingConf(logDirPath, codecName)
+ val sc = new SparkContext("local-cluster[2,1,512]", "Test replay", conf)
+
+ // Run a few jobs
+ sc.parallelize(1 to 100, 1).count()
+ sc.parallelize(1 to 100, 2).map(i => (i, i)).count()
+ sc.parallelize(1 to 100, 3).map(i => (i, i)).groupByKey().count()
+ sc.parallelize(1 to 100, 4).map(i => (i, i)).groupByKey().persist().count()
+ sc.stop()
+
+ // Prepare information needed for replay
+ val codec = codecName.map(getCompressionCodec)
+ val applications = fileSystem.listStatus(logDirPath)
+ assert(applications != null && applications.size > 0)
+ val eventLogDir = applications.sortBy(_.getAccessTime).last
+ assert(eventLogDir.isDir)
+ val logFiles = fileSystem.listStatus(eventLogDir.getPath)
+ assert(logFiles != null && logFiles.size > 0)
+ val logFile = logFiles.find(_.getPath.getName.startsWith("EVENT_LOG_"))
+ assert(logFile.isDefined)
+ val logFilePath = logFile.get.getPath
+
+ // Replay events
+ val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec)
+ val eventMonster = new EventMonster(conf)
+ replayer.addListener(eventMonster)
+ replayer.replay()
+
+ // Verify the same events are replayed in the same order
+ assert(sc.eventLogger.isDefined)
+ val originalEvents = sc.eventLogger.get.loggedEvents
+ val replayedEvents = eventMonster.loggedEvents
+ originalEvents.zip(replayedEvents).foreach { case (e1, e2) => assert(e1 === e2) }
+ }
+
+ /**
+ * A simple listener that buffers all the events it receives.
+ *
+ * The event buffering functionality must be implemented within EventLoggingListener itself.
+ * This is because of the following race condition: the event may be mutated between being
+ * processed by one listener and being processed by another. Thus, in order to establish
+ * a fair comparison between the original events and the replayed events, both functionalities
+ * must be implemented within one listener (i.e. the EventLoggingListener).
+ *
+ * This child listener inherits only the event buffering functionality, but does not actually
+ * log the events.
+ */
+ private class EventMonster(conf: SparkConf) extends EventLoggingListener("test", conf) {
+ logger.close()
+ }
+
+ private def getCompressionCodec(codecName: String) = {
+ val conf = new SparkConf
+ conf.set("spark.io.compression.codec", codecName)
+ CompressionCodec.createCodec(conf)
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
new file mode 100644
index 0000000000..f675e1e5b4
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.IOException
+
+import scala.io.Source
+import scala.util.Try
+
+import com.google.common.io.Files
+import org.apache.hadoop.fs.Path
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.io.CompressionCodec
+
+/**
+ * Test writing files through the FileLogger.
+ */
+class FileLoggerSuite extends FunSuite with BeforeAndAfter {
+ private val fileSystem = Utils.getHadoopFileSystem("/")
+ private val allCompressionCodecs = Seq[String](
+ "org.apache.spark.io.LZFCompressionCodec",
+ "org.apache.spark.io.SnappyCompressionCodec"
+ )
+ private val testDir = Files.createTempDir()
+ private val logDirPath = Utils.getFilePath(testDir, "test-file-logger")
+ private val logDirPathString = logDirPath.toString
+
+ after {
+ Try { fileSystem.delete(logDirPath, true) }
+ }
+
+ test("Simple logging") {
+ testSingleFile()
+ }
+
+ test ("Simple logging with compression") {
+ allCompressionCodecs.foreach { codec =>
+ testSingleFile(Some(codec))
+ }
+ }
+
+ test("Logging multiple files") {
+ testMultipleFiles()
+ }
+
+ test("Logging multiple files with compression") {
+ allCompressionCodecs.foreach { codec =>
+ testMultipleFiles(Some(codec))
+ }
+ }
+
+ test("Logging when directory already exists") {
+ // Create the logging directory multiple times
+ new FileLogger(logDirPathString, new SparkConf, overwrite = true).start()
+ new FileLogger(logDirPathString, new SparkConf, overwrite = true).start()
+ new FileLogger(logDirPathString, new SparkConf, overwrite = true).start()
+
+ // If overwrite is not enabled, an exception should be thrown
+ intercept[IOException] {
+ new FileLogger(logDirPathString, new SparkConf, overwrite = false).start()
+ }
+ }
+
+
+ /* ----------------- *
+ * Actual test logic *
+ * ----------------- */
+
+ /**
+ * Test logging to a single file.
+ */
+ private def testSingleFile(codecName: Option[String] = None) {
+ val conf = getLoggingConf(codecName)
+ val codec = codecName.map { c => CompressionCodec.createCodec(conf) }
+ val logger =
+ if (codecName.isDefined) {
+ new FileLogger(logDirPathString, conf, compress = true)
+ } else {
+ new FileLogger(logDirPathString, conf)
+ }
+ logger.start()
+ assert(fileSystem.exists(logDirPath))
+ assert(fileSystem.getFileStatus(logDirPath).isDir)
+ assert(fileSystem.listStatus(logDirPath).size === 0)
+
+ logger.newFile()
+ val files = fileSystem.listStatus(logDirPath)
+ assert(files.size === 1)
+ val firstFile = files.head
+ val firstFilePath = firstFile.getPath
+
+ logger.log("hello")
+ logger.flush()
+ assert(readFileContent(firstFilePath, codec) === "hello")
+
+ logger.log(" world")
+ logger.close()
+ assert(readFileContent(firstFilePath, codec) === "hello world")
+ }
+
+ /**
+ * Test logging to multiple files.
+ */
+ private def testMultipleFiles(codecName: Option[String] = None) {
+ val conf = getLoggingConf(codecName)
+ val codec = codecName.map { c => CompressionCodec.createCodec(conf) }
+ val logger =
+ if (codecName.isDefined) {
+ new FileLogger(logDirPathString, conf, compress = true)
+ } else {
+ new FileLogger(logDirPathString, conf)
+ }
+ logger.start()
+ logger.newFile("Jean_Valjean")
+ logger.logLine("Who am I?")
+ logger.logLine("Destiny?")
+ logger.newFile("John_Valjohn")
+ logger.logLine("One")
+ logger.logLine("Two three...")
+ logger.newFile("Wolverine")
+ logger.logLine("There was a time")
+ logger.logLine("A time when our enemies knew honor.")
+ logger.close()
+ assert(readFileContent(new Path(logDirPath, "Jean_Valjean"), codec) === "Who am I?\nDestiny?")
+ assert(readFileContent(new Path(logDirPath, "John_Valjohn"), codec) === "One\nTwo three...")
+ assert(readFileContent(new Path(logDirPath, "Wolverine"), codec) ===
+ "There was a time\nA time when our enemies knew honor.")
+ }
+
+ /**
+ * Read the content of the file specified by the given path.
+ * If a compression codec is specified, use it to read the file.
+ */
+ private def readFileContent(logPath: Path, codec: Option[CompressionCodec] = None): String = {
+ val fstream = fileSystem.open(logPath)
+ val cstream = codec.map(_.compressedInputStream(fstream)).getOrElse(fstream)
+ Source.fromInputStream(cstream).getLines().mkString("\n")
+ }
+
+ private def getLoggingConf(codecName: Option[String]) = {
+ val conf = new SparkConf
+ codecName.foreach { c => conf.set("spark.io.compression.codec", c) }
+ conf
+ }
+
+}