aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-05-01 21:42:06 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-05-01 21:42:06 -0700
commit394d8cb1c4dfd1e496562009e716b8fc06be22cd (patch)
tree84bd990a34d195d94ceb83762bd91a65b612b8fa /core/src/test
parent40cf6d31019c5402e5eb08158856242d20697ba4 (diff)
downloadspark-394d8cb1c4dfd1e496562009e716b8fc06be22cd.tar.gz
spark-394d8cb1c4dfd1e496562009e716b8fc06be22cd.tar.bz2
spark-394d8cb1c4dfd1e496562009e716b8fc06be22cd.zip
Add tests for FileLogger, EventLoggingListener, and ReplayListenerBus
Modifications to Spark core are limited to exposing functionality to test files + minor style fixes. (728 / 769 lines are from tests) Author: Andrew Or <andrewor14@gmail.com> Closes #591 from andrewor14/event-log-tests and squashes the following commits: 2883837 [Andrew Or] Merge branch 'master' of github.com:apache/spark into event-log-tests c3afcea [Andrew Or] Compromise 2d5daf8 [Andrew Or] Use temp directory provided by the OS rather than /tmp 2b52151 [Andrew Or] Remove unnecessary file delete + add a comment 62010fd [Andrew Or] More cleanup (renaming variables, updating comments etc) ad2beff [Andrew Or] Clean up EventLoggingListenerSuite + modify a few comments 862e752 [Andrew Or] Merge branch 'master' of github.com:apache/spark into event-log-tests e0ba2f8 [Andrew Or] Fix test failures caused by race condition in processing/mutating events b990453 [Andrew Or] ReplayListenerBus suite - tests do not all pass yet ab66a84 [Andrew Or] Tests for FileLogger + delete file after tests 187bb25 [Andrew Or] Formatting and renaming variables 769336f [Andrew Or] Merge branch 'master' of github.com:apache/spark into event-log-tests 5d38ffe [Andrew Or] Clean up EventLoggingListenerSuite + add comments e12f4b1 [Andrew Or] Preliminary tests for EventLoggingListener (need major cleanup)
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala400
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala166
-rw-r--r--core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala163
3 files changed, 729 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
new file mode 100644
index 0000000000..95f5bcd855
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+import scala.io.Source
+import scala.util.Try
+
+import com.google.common.io.Files
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.json4s.jackson.JsonMethods._
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.{JsonProtocol, Utils}
+
+/**
+ * Test whether EventLoggingListener logs events properly.
+ *
+ * This tests whether EventLoggingListener actually creates special files while logging events,
+ * whether the parsing of these special files is correct, and whether the logged events can be
+ * read and deserialized into actual SparkListenerEvents.
+ */
+class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
+ private val fileSystem = Utils.getHadoopFileSystem("/")
+ private val allCompressionCodecs = Seq[String](
+ "org.apache.spark.io.LZFCompressionCodec",
+ "org.apache.spark.io.SnappyCompressionCodec"
+ )
+ private val testDir = Files.createTempDir()
+ private val logDirPath = Utils.getFilePath(testDir, "spark-events")
+
+ after {
+ Try { fileSystem.delete(logDirPath, true) }
+ }
+
+ test("Parse names of special files") {
+ testParsingFileName()
+ }
+
+ test("Verify special files exist") {
+ testSpecialFilesExist()
+ }
+
+ test("Verify special files exist with compression") {
+ allCompressionCodecs.foreach { codec =>
+ testSpecialFilesExist(compressionCodec = Some(codec))
+ }
+ }
+
+ test("Parse event logging info") {
+ testParsingLogInfo()
+ }
+
+ test("Parse event logging info with compression") {
+ allCompressionCodecs.foreach { codec =>
+ testParsingLogInfo(compressionCodec = Some(codec))
+ }
+ }
+
+ test("Basic event logging") {
+ testEventLogging()
+ }
+
+ test("Basic event logging with compression") {
+ allCompressionCodecs.foreach { codec =>
+ testEventLogging(compressionCodec = Some(codec))
+ }
+ }
+
+ test("End-to-end event logging") {
+ testApplicationEventLogging()
+ }
+
+ test("End-to-end event logging with compression") {
+ allCompressionCodecs.foreach { codec =>
+ testApplicationEventLogging(compressionCodec = Some(codec))
+ }
+ }
+
+
+ /* ----------------- *
+ * Actual test logic *
+ * ----------------- */
+
+ import EventLoggingListenerSuite._
+
+ /**
+ * Test whether names of special files are correctly identified and parsed.
+ */
+ private def testParsingFileName() {
+ val logPrefix = EventLoggingListener.LOG_PREFIX
+ val sparkVersionPrefix = EventLoggingListener.SPARK_VERSION_PREFIX
+ val compressionCodecPrefix = EventLoggingListener.COMPRESSION_CODEC_PREFIX
+ val applicationComplete = EventLoggingListener.APPLICATION_COMPLETE
+ assert(EventLoggingListener.isEventLogFile(logPrefix + "0"))
+ assert(EventLoggingListener.isEventLogFile(logPrefix + "100"))
+ assert(EventLoggingListener.isEventLogFile(logPrefix + "ANYTHING"))
+ assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "0.9.1"))
+ assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "1.0.0"))
+ assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "ANYTHING"))
+ assert(EventLoggingListener.isApplicationCompleteFile(applicationComplete))
+ allCompressionCodecs.foreach { codec =>
+ assert(EventLoggingListener.isCompressionCodecFile(compressionCodecPrefix + codec))
+ }
+
+ // Negatives
+ assert(!EventLoggingListener.isEventLogFile("The greatest man of all mankind"))
+ assert(!EventLoggingListener.isSparkVersionFile("Will never falter in the face of death!"))
+ assert(!EventLoggingListener.isCompressionCodecFile("Unless he chooses to leave behind"))
+ assert(!EventLoggingListener.isApplicationCompleteFile("The very treasure he calls Macbeth"))
+
+ // Verify that parsing is correct
+ assert(EventLoggingListener.parseSparkVersion(sparkVersionPrefix + "1.0.0") === "1.0.0")
+ allCompressionCodecs.foreach { codec =>
+ assert(EventLoggingListener.parseCompressionCodec(compressionCodecPrefix + codec) === codec)
+ }
+ }
+
+ /**
+ * Test whether the special files produced by EventLoggingListener exist.
+ *
+ * There should be exactly one event log and one spark version file throughout the entire
+ * execution. If a compression codec is specified, then the compression codec file should
+ * also exist. Only after the application has completed does the test expect the application
+ * completed file to be present.
+ */
+ private def testSpecialFilesExist(compressionCodec: Option[String] = None) {
+
+ def assertFilesExist(logFiles: Array[FileStatus], loggerStopped: Boolean) {
+ val numCompressionCodecFiles = if (compressionCodec.isDefined) 1 else 0
+ val numApplicationCompleteFiles = if (loggerStopped) 1 else 0
+ assert(logFiles.size === 2 + numCompressionCodecFiles + numApplicationCompleteFiles)
+ assert(eventLogsExist(logFiles))
+ assert(sparkVersionExists(logFiles))
+ assert(compressionCodecExists(logFiles) === compressionCodec.isDefined)
+ assert(applicationCompleteExists(logFiles) === loggerStopped)
+ assertSparkVersionIsValid(logFiles)
+ compressionCodec.foreach { codec =>
+ assertCompressionCodecIsValid(logFiles, codec)
+ }
+ }
+
+ // Verify logging directory exists
+ val conf = getLoggingConf(logDirPath, compressionCodec)
+ val eventLogger = new EventLoggingListener("test", conf)
+ eventLogger.start()
+ val logPath = new Path(eventLogger.logDir)
+ assert(fileSystem.exists(logPath))
+ val logDir = fileSystem.getFileStatus(logPath)
+ assert(logDir.isDir)
+
+ // Verify special files are as expected before stop()
+ var logFiles = fileSystem.listStatus(logPath)
+ assert(logFiles != null)
+ assertFilesExist(logFiles, loggerStopped = false)
+
+ // Verify special files are as expected after stop()
+ eventLogger.stop()
+ logFiles = fileSystem.listStatus(logPath)
+ assertFilesExist(logFiles, loggerStopped = true)
+ }
+
+ /**
+ * Test whether EventLoggingListener correctly parses the correct information from the logs.
+ *
+ * This includes whether it returns the correct Spark version, compression codec (if any),
+ * and the application's completion status.
+ */
+ private def testParsingLogInfo(compressionCodec: Option[String] = None) {
+
+ def assertInfoCorrect(info: EventLoggingInfo, loggerStopped: Boolean) {
+ assert(info.logPaths.size > 0)
+ assert(info.sparkVersion === SparkContext.SPARK_VERSION)
+ assert(info.compressionCodec.isDefined === compressionCodec.isDefined)
+ info.compressionCodec.foreach { codec =>
+ assert(compressionCodec.isDefined)
+ val expectedCodec = compressionCodec.get.split('.').last
+ assert(codec.getClass.getSimpleName === expectedCodec)
+ }
+ assert(info.applicationComplete === loggerStopped)
+ }
+
+ // Verify that all information is correctly parsed before stop()
+ val conf = getLoggingConf(logDirPath, compressionCodec)
+ val eventLogger = new EventLoggingListener("test", conf)
+ eventLogger.start()
+ var eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
+ assertInfoCorrect(eventLoggingInfo, loggerStopped = false)
+
+ // Verify that all information is correctly parsed after stop()
+ eventLogger.stop()
+ eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
+ assertInfoCorrect(eventLoggingInfo, loggerStopped = true)
+ }
+
+ /**
+ * Test basic event logging functionality.
+ *
+ * This creates two simple events, posts them to the EventLoggingListener, and verifies that
+ * exactly these two events are logged in the expected file.
+ */
+ private def testEventLogging(compressionCodec: Option[String] = None) {
+ val conf = getLoggingConf(logDirPath, compressionCodec)
+ val eventLogger = new EventLoggingListener("test", conf)
+ val listenerBus = new LiveListenerBus
+ val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey")
+ val applicationEnd = SparkListenerApplicationEnd(1000L)
+
+ // A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
+ eventLogger.start()
+ listenerBus.start()
+ listenerBus.addListener(eventLogger)
+ listenerBus.postToAll(applicationStart)
+ listenerBus.postToAll(applicationEnd)
+
+ // Verify file contains exactly the two events logged
+ val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
+ assert(eventLoggingInfo.logPaths.size > 0)
+ val lines = readFileLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec)
+ assert(lines.size === 2)
+ assert(lines(0).contains("SparkListenerApplicationStart"))
+ assert(lines(1).contains("SparkListenerApplicationEnd"))
+ assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === applicationStart)
+ assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationEnd)
+ eventLogger.stop()
+ }
+
+ /**
+ * Test end-to-end event logging functionality in an application.
+ * This runs a simple Spark job and asserts that the expected events are logged when expected.
+ */
+ private def testApplicationEventLogging(compressionCodec: Option[String] = None) {
+ val conf = getLoggingConf(logDirPath, compressionCodec)
+ val sc = new SparkContext("local", "test", conf)
+ assert(sc.eventLogger.isDefined)
+ val eventLogger = sc.eventLogger.get
+ val expectedLogDir = logDirPath.toString
+ assert(eventLogger.logDir.startsWith(expectedLogDir))
+
+ // Begin listening for events that trigger asserts
+ val eventExistenceListener = new EventExistenceListener(eventLogger)
+ sc.addSparkListener(eventExistenceListener)
+
+ // Trigger asserts for whether the expected events are actually logged
+ sc.parallelize(1 to 10000).count()
+ sc.stop()
+
+ // Ensure all asserts have actually been triggered
+ eventExistenceListener.assertAllCallbacksInvoked()
+ }
+
+ /**
+ * Assert that all of the specified events are logged by the given EventLoggingListener.
+ */
+ private def assertEventsExist(eventLogger: EventLoggingListener, events: Seq[String]) {
+ val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
+ assert(eventLoggingInfo.logPaths.size > 0)
+ val lines = readFileLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec)
+ val eventSet = mutable.Set(events: _*)
+ lines.foreach { line =>
+ eventSet.foreach { event =>
+ if (line.contains(event)) {
+ val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line))
+ val eventType = Utils.getFormattedClassName(parsedEvent)
+ if (eventType == event) {
+ eventSet.remove(event)
+ }
+ }
+ }
+ }
+ assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
+ }
+
+ /**
+ * Read all lines from the file specified by the given path.
+ * If a compression codec is specified, use it to read the file.
+ */
+ private def readFileLines(
+ filePath: Path,
+ compressionCodec: Option[CompressionCodec]): Seq[String] = {
+ val fstream = fileSystem.open(filePath)
+ val cstream =
+ compressionCodec.map { codec =>
+ codec.compressedInputStream(fstream)
+ }.getOrElse(fstream)
+ Source.fromInputStream(cstream).getLines().toSeq
+ }
+
+ /**
+ * A listener that asserts certain events are logged by the given EventLoggingListener.
+ * This is necessary because events are posted asynchronously in a different thread.
+ */
+ private class EventExistenceListener(eventLogger: EventLoggingListener) extends SparkListener {
+ var jobStarted = false
+ var jobEnded = false
+ var appEnded = false
+
+ override def onJobStart(jobStart: SparkListenerJobStart) {
+ assertEventsExist(eventLogger, Seq[String](
+ Utils.getFormattedClassName(SparkListenerApplicationStart),
+ Utils.getFormattedClassName(SparkListenerBlockManagerAdded),
+ Utils.getFormattedClassName(SparkListenerEnvironmentUpdate)
+ ))
+ jobStarted = true
+ }
+
+ override def onJobEnd(jobEnd: SparkListenerJobEnd) {
+ assertEventsExist(eventLogger, Seq[String](
+ Utils.getFormattedClassName(SparkListenerJobStart),
+ Utils.getFormattedClassName(SparkListenerJobEnd),
+ Utils.getFormattedClassName(SparkListenerStageSubmitted),
+ Utils.getFormattedClassName(SparkListenerStageCompleted),
+ Utils.getFormattedClassName(SparkListenerTaskStart),
+ Utils.getFormattedClassName(SparkListenerTaskEnd)
+ ))
+ jobEnded = true
+ }
+
+ override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
+ assertEventsExist(eventLogger, Seq[String](
+ Utils.getFormattedClassName(SparkListenerApplicationEnd)
+ ))
+ appEnded = true
+ }
+
+ def assertAllCallbacksInvoked() {
+ assert(jobStarted, "JobStart callback not invoked!")
+ assert(jobEnded, "JobEnd callback not invoked!")
+ assert(appEnded, "ApplicationEnd callback not invoked!")
+ }
+ }
+
+
+ /* -------------------------------------------------------- *
+ * Helper methods for validating state of the special files *
+ * -------------------------------------------------------- */
+
+ private def eventLogsExist(logFiles: Array[FileStatus]): Boolean = {
+ logFiles.map(_.getPath.getName).exists(EventLoggingListener.isEventLogFile)
+ }
+
+ private def sparkVersionExists(logFiles: Array[FileStatus]): Boolean = {
+ logFiles.map(_.getPath.getName).exists(EventLoggingListener.isSparkVersionFile)
+ }
+
+ private def compressionCodecExists(logFiles: Array[FileStatus]): Boolean = {
+ logFiles.map(_.getPath.getName).exists(EventLoggingListener.isCompressionCodecFile)
+ }
+
+ private def applicationCompleteExists(logFiles: Array[FileStatus]): Boolean = {
+ logFiles.map(_.getPath.getName).exists(EventLoggingListener.isApplicationCompleteFile)
+ }
+
+ private def assertSparkVersionIsValid(logFiles: Array[FileStatus]) {
+ val file = logFiles.map(_.getPath.getName).find(EventLoggingListener.isSparkVersionFile)
+ assert(file.isDefined)
+ assert(EventLoggingListener.parseSparkVersion(file.get) === SparkContext.SPARK_VERSION)
+ }
+
+ private def assertCompressionCodecIsValid(logFiles: Array[FileStatus], compressionCodec: String) {
+ val file = logFiles.map(_.getPath.getName).find(EventLoggingListener.isCompressionCodecFile)
+ assert(file.isDefined)
+ assert(EventLoggingListener.parseCompressionCodec(file.get) === compressionCodec)
+ }
+
+}
+
+
+object EventLoggingListenerSuite {
+
+ /** Get a SparkConf with event logging enabled. */
+ def getLoggingConf(logDir: Path, compressionCodec: Option[String] = None) = {
+ val conf = new SparkConf
+ conf.set("spark.eventLog.enabled", "true")
+ conf.set("spark.eventLog.testing", "true")
+ conf.set("spark.eventLog.dir", logDir.toString)
+ compressionCodec.foreach { codec =>
+ conf.set("spark.eventLog.compress", "true")
+ conf.set("spark.io.compression.codec", codec)
+ }
+ conf
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
new file mode 100644
index 0000000000..d1fe1fc348
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io.PrintWriter
+
+import scala.util.Try
+
+import com.google.common.io.Files
+import org.json4s.jackson.JsonMethods._
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.{JsonProtocol, Utils}
+
+/**
+ * Test whether ReplayListenerBus replays events from logs correctly.
+ */
+class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
+ private val fileSystem = Utils.getHadoopFileSystem("/")
+ private val allCompressionCodecs = Seq[String](
+ "org.apache.spark.io.LZFCompressionCodec",
+ "org.apache.spark.io.SnappyCompressionCodec"
+ )
+ private val testDir = Files.createTempDir()
+
+ after {
+ Try { fileSystem.delete(Utils.getFilePath(testDir, "events.txt"), true) }
+ Try { fileSystem.delete(Utils.getFilePath(testDir, "test-replay"), true) }
+ }
+
+ test("Simple replay") {
+ testSimpleReplay()
+ }
+
+ test("Simple replay with compression") {
+ allCompressionCodecs.foreach { codec =>
+ testSimpleReplay(Some(codec))
+ }
+ }
+
+ // This assumes the correctness of EventLoggingListener
+ test("End-to-end replay") {
+ testApplicationReplay()
+ }
+
+ // This assumes the correctness of EventLoggingListener
+ test("End-to-end replay with compression") {
+ allCompressionCodecs.foreach { codec =>
+ testApplicationReplay(Some(codec))
+ }
+ }
+
+
+ /* ----------------- *
+ * Actual test logic *
+ * ----------------- */
+
+ /**
+ * Test simple replaying of events.
+ */
+ private def testSimpleReplay(codecName: Option[String] = None) {
+ val logFilePath = Utils.getFilePath(testDir, "events.txt")
+ val codec = codecName.map(getCompressionCodec)
+ val fstream = fileSystem.create(logFilePath)
+ val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
+ val writer = new PrintWriter(cstream)
+ val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey")
+ val applicationEnd = SparkListenerApplicationEnd(1000L)
+ writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
+ writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
+ writer.close()
+ val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec)
+ val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath, codecName)
+ val eventMonster = new EventMonster(conf)
+ replayer.addListener(eventMonster)
+ replayer.replay()
+ assert(eventMonster.loggedEvents.size === 2)
+ assert(eventMonster.loggedEvents(0) === JsonProtocol.sparkEventToJson(applicationStart))
+ assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd))
+ }
+
+ /**
+ * Test end-to-end replaying of events.
+ *
+ * This test runs a few simple jobs with event logging enabled, and compares each emitted
+ * event to the corresponding event replayed from the event logs. This test makes the
+ * assumption that the event logging behavior is correct (tested in a separate suite).
+ */
+ private def testApplicationReplay(codecName: Option[String] = None) {
+ val logDirPath = Utils.getFilePath(testDir, "test-replay")
+ val conf = EventLoggingListenerSuite.getLoggingConf(logDirPath, codecName)
+ val sc = new SparkContext("local-cluster[2,1,512]", "Test replay", conf)
+
+ // Run a few jobs
+ sc.parallelize(1 to 100, 1).count()
+ sc.parallelize(1 to 100, 2).map(i => (i, i)).count()
+ sc.parallelize(1 to 100, 3).map(i => (i, i)).groupByKey().count()
+ sc.parallelize(1 to 100, 4).map(i => (i, i)).groupByKey().persist().count()
+ sc.stop()
+
+ // Prepare information needed for replay
+ val codec = codecName.map(getCompressionCodec)
+ val applications = fileSystem.listStatus(logDirPath)
+ assert(applications != null && applications.size > 0)
+ val eventLogDir = applications.sortBy(_.getAccessTime).last
+ assert(eventLogDir.isDir)
+ val logFiles = fileSystem.listStatus(eventLogDir.getPath)
+ assert(logFiles != null && logFiles.size > 0)
+ val logFile = logFiles.find(_.getPath.getName.startsWith("EVENT_LOG_"))
+ assert(logFile.isDefined)
+ val logFilePath = logFile.get.getPath
+
+ // Replay events
+ val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec)
+ val eventMonster = new EventMonster(conf)
+ replayer.addListener(eventMonster)
+ replayer.replay()
+
+ // Verify the same events are replayed in the same order
+ assert(sc.eventLogger.isDefined)
+ val originalEvents = sc.eventLogger.get.loggedEvents
+ val replayedEvents = eventMonster.loggedEvents
+ originalEvents.zip(replayedEvents).foreach { case (e1, e2) => assert(e1 === e2) }
+ }
+
+ /**
+ * A simple listener that buffers all the events it receives.
+ *
+ * The event buffering functionality must be implemented within EventLoggingListener itself.
+ * This is because of the following race condition: the event may be mutated between being
+ * processed by one listener and being processed by another. Thus, in order to establish
+ * a fair comparison between the original events and the replayed events, both functionalities
+ * must be implemented within one listener (i.e. the EventLoggingListener).
+ *
+ * This child listener inherits only the event buffering functionality, but does not actually
+ * log the events.
+ */
+ private class EventMonster(conf: SparkConf) extends EventLoggingListener("test", conf) {
+ logger.close()
+ }
+
+ private def getCompressionCodec(codecName: String) = {
+ val conf = new SparkConf
+ conf.set("spark.io.compression.codec", codecName)
+ CompressionCodec.createCodec(conf)
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
new file mode 100644
index 0000000000..f675e1e5b4
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.IOException
+
+import scala.io.Source
+import scala.util.Try
+
+import com.google.common.io.Files
+import org.apache.hadoop.fs.Path
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.io.CompressionCodec
+
+/**
+ * Test writing files through the FileLogger.
+ */
+class FileLoggerSuite extends FunSuite with BeforeAndAfter {
+ private val fileSystem = Utils.getHadoopFileSystem("/")
+ private val allCompressionCodecs = Seq[String](
+ "org.apache.spark.io.LZFCompressionCodec",
+ "org.apache.spark.io.SnappyCompressionCodec"
+ )
+ private val testDir = Files.createTempDir()
+ private val logDirPath = Utils.getFilePath(testDir, "test-file-logger")
+ private val logDirPathString = logDirPath.toString
+
+ after {
+ Try { fileSystem.delete(logDirPath, true) }
+ }
+
+ test("Simple logging") {
+ testSingleFile()
+ }
+
+ test ("Simple logging with compression") {
+ allCompressionCodecs.foreach { codec =>
+ testSingleFile(Some(codec))
+ }
+ }
+
+ test("Logging multiple files") {
+ testMultipleFiles()
+ }
+
+ test("Logging multiple files with compression") {
+ allCompressionCodecs.foreach { codec =>
+ testMultipleFiles(Some(codec))
+ }
+ }
+
+ test("Logging when directory already exists") {
+ // Create the logging directory multiple times
+ new FileLogger(logDirPathString, new SparkConf, overwrite = true).start()
+ new FileLogger(logDirPathString, new SparkConf, overwrite = true).start()
+ new FileLogger(logDirPathString, new SparkConf, overwrite = true).start()
+
+ // If overwrite is not enabled, an exception should be thrown
+ intercept[IOException] {
+ new FileLogger(logDirPathString, new SparkConf, overwrite = false).start()
+ }
+ }
+
+
+ /* ----------------- *
+ * Actual test logic *
+ * ----------------- */
+
+ /**
+ * Test logging to a single file.
+ */
+ private def testSingleFile(codecName: Option[String] = None) {
+ val conf = getLoggingConf(codecName)
+ val codec = codecName.map { c => CompressionCodec.createCodec(conf) }
+ val logger =
+ if (codecName.isDefined) {
+ new FileLogger(logDirPathString, conf, compress = true)
+ } else {
+ new FileLogger(logDirPathString, conf)
+ }
+ logger.start()
+ assert(fileSystem.exists(logDirPath))
+ assert(fileSystem.getFileStatus(logDirPath).isDir)
+ assert(fileSystem.listStatus(logDirPath).size === 0)
+
+ logger.newFile()
+ val files = fileSystem.listStatus(logDirPath)
+ assert(files.size === 1)
+ val firstFile = files.head
+ val firstFilePath = firstFile.getPath
+
+ logger.log("hello")
+ logger.flush()
+ assert(readFileContent(firstFilePath, codec) === "hello")
+
+ logger.log(" world")
+ logger.close()
+ assert(readFileContent(firstFilePath, codec) === "hello world")
+ }
+
+ /**
+ * Test logging to multiple files.
+ */
+ private def testMultipleFiles(codecName: Option[String] = None) {
+ val conf = getLoggingConf(codecName)
+ val codec = codecName.map { c => CompressionCodec.createCodec(conf) }
+ val logger =
+ if (codecName.isDefined) {
+ new FileLogger(logDirPathString, conf, compress = true)
+ } else {
+ new FileLogger(logDirPathString, conf)
+ }
+ logger.start()
+ logger.newFile("Jean_Valjean")
+ logger.logLine("Who am I?")
+ logger.logLine("Destiny?")
+ logger.newFile("John_Valjohn")
+ logger.logLine("One")
+ logger.logLine("Two three...")
+ logger.newFile("Wolverine")
+ logger.logLine("There was a time")
+ logger.logLine("A time when our enemies knew honor.")
+ logger.close()
+ assert(readFileContent(new Path(logDirPath, "Jean_Valjean"), codec) === "Who am I?\nDestiny?")
+ assert(readFileContent(new Path(logDirPath, "John_Valjohn"), codec) === "One\nTwo three...")
+ assert(readFileContent(new Path(logDirPath, "Wolverine"), codec) ===
+ "There was a time\nA time when our enemies knew honor.")
+ }
+
+ /**
+ * Read the content of the file specified by the given path.
+ * If a compression codec is specified, use it to read the file.
+ */
+ private def readFileContent(logPath: Path, codec: Option[CompressionCodec] = None): String = {
+ val fstream = fileSystem.open(logPath)
+ val cstream = codec.map(_.compressedInputStream(fstream)).getOrElse(fstream)
+ Source.fromInputStream(cstream).getLines().mkString("\n")
+ }
+
+ private def getLoggingConf(codecName: Option[String]) = {
+ val conf = new SparkConf
+ codecName.foreach { c => conf.set("spark.io.compression.codec", c) }
+ conf
+ }
+
+}