diff options
author | Andrew Or <andrewor14@gmail.com> | 2014-05-01 21:42:06 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-05-01 21:42:06 -0700 |
commit | 394d8cb1c4dfd1e496562009e716b8fc06be22cd (patch) | |
tree | 84bd990a34d195d94ceb83762bd91a65b612b8fa /core/src/test | |
parent | 40cf6d31019c5402e5eb08158856242d20697ba4 (diff) | |
download | spark-394d8cb1c4dfd1e496562009e716b8fc06be22cd.tar.gz spark-394d8cb1c4dfd1e496562009e716b8fc06be22cd.tar.bz2 spark-394d8cb1c4dfd1e496562009e716b8fc06be22cd.zip |
Add tests for FileLogger, EventLoggingListener, and ReplayListenerBus
Modifications to Spark core are limited to exposing functionality to test files + minor style fixes.
(728 / 769 lines are from tests)
Author: Andrew Or <andrewor14@gmail.com>
Closes #591 from andrewor14/event-log-tests and squashes the following commits:
2883837 [Andrew Or] Merge branch 'master' of github.com:apache/spark into event-log-tests
c3afcea [Andrew Or] Compromise
2d5daf8 [Andrew Or] Use temp directory provided by the OS rather than /tmp
2b52151 [Andrew Or] Remove unnecessary file delete + add a comment
62010fd [Andrew Or] More cleanup (renaming variables, updating comments etc)
ad2beff [Andrew Or] Clean up EventLoggingListenerSuite + modify a few comments
862e752 [Andrew Or] Merge branch 'master' of github.com:apache/spark into event-log-tests
e0ba2f8 [Andrew Or] Fix test failures caused by race condition in processing/mutating events
b990453 [Andrew Or] ReplayListenerBus suite - tests do not all pass yet
ab66a84 [Andrew Or] Tests for FileLogger + delete file after tests
187bb25 [Andrew Or] Formatting and renaming variables
769336f [Andrew Or] Merge branch 'master' of github.com:apache/spark into event-log-tests
5d38ffe [Andrew Or] Clean up EventLoggingListenerSuite + add comments
e12f4b1 [Andrew Or] Preliminary tests for EventLoggingListener (need major cleanup)
Diffstat (limited to 'core/src/test')
3 files changed, 729 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala new file mode 100644 index 0000000000..95f5bcd855 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable +import scala.io.Source +import scala.util.Try + +import com.google.common.io.Files +import org.apache.hadoop.fs.{FileStatus, Path} +import org.json4s.jackson.JsonMethods._ +import org.scalatest.{BeforeAndAfter, FunSuite} + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.{JsonProtocol, Utils} + +/** + * Test whether EventLoggingListener logs events properly. + * + * This tests whether EventLoggingListener actually creates special files while logging events, + * whether the parsing of these special files is correct, and whether the logged events can be + * read and deserialized into actual SparkListenerEvents. + */ +class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { + private val fileSystem = Utils.getHadoopFileSystem("/") + private val allCompressionCodecs = Seq[String]( + "org.apache.spark.io.LZFCompressionCodec", + "org.apache.spark.io.SnappyCompressionCodec" + ) + private val testDir = Files.createTempDir() + private val logDirPath = Utils.getFilePath(testDir, "spark-events") + + after { + Try { fileSystem.delete(logDirPath, true) } + } + + test("Parse names of special files") { + testParsingFileName() + } + + test("Verify special files exist") { + testSpecialFilesExist() + } + + test("Verify special files exist with compression") { + allCompressionCodecs.foreach { codec => + testSpecialFilesExist(compressionCodec = Some(codec)) + } + } + + test("Parse event logging info") { + testParsingLogInfo() + } + + test("Parse event logging info with compression") { + allCompressionCodecs.foreach { codec => + testParsingLogInfo(compressionCodec = Some(codec)) + } + } + + test("Basic event logging") { + testEventLogging() + } + + test("Basic event logging with compression") { + allCompressionCodecs.foreach { codec => + testEventLogging(compressionCodec = Some(codec)) + } + } + + test("End-to-end event logging") { + testApplicationEventLogging() + } + + test("End-to-end event logging with compression") { + allCompressionCodecs.foreach { codec => + testApplicationEventLogging(compressionCodec = Some(codec)) + } + } + + + /* ----------------- * + * Actual test logic * + * ----------------- */ + + import EventLoggingListenerSuite._ + + /** + * Test whether names of special files are correctly identified and parsed. + */ + private def testParsingFileName() { + val logPrefix = EventLoggingListener.LOG_PREFIX + val sparkVersionPrefix = EventLoggingListener.SPARK_VERSION_PREFIX + val compressionCodecPrefix = EventLoggingListener.COMPRESSION_CODEC_PREFIX + val applicationComplete = EventLoggingListener.APPLICATION_COMPLETE + assert(EventLoggingListener.isEventLogFile(logPrefix + "0")) + assert(EventLoggingListener.isEventLogFile(logPrefix + "100")) + assert(EventLoggingListener.isEventLogFile(logPrefix + "ANYTHING")) + assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "0.9.1")) + assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "1.0.0")) + assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "ANYTHING")) + assert(EventLoggingListener.isApplicationCompleteFile(applicationComplete)) + allCompressionCodecs.foreach { codec => + assert(EventLoggingListener.isCompressionCodecFile(compressionCodecPrefix + codec)) + } + + // Negatives + assert(!EventLoggingListener.isEventLogFile("The greatest man of all mankind")) + assert(!EventLoggingListener.isSparkVersionFile("Will never falter in the face of death!")) + assert(!EventLoggingListener.isCompressionCodecFile("Unless he chooses to leave behind")) + assert(!EventLoggingListener.isApplicationCompleteFile("The very treasure he calls Macbeth")) + + // Verify that parsing is correct + assert(EventLoggingListener.parseSparkVersion(sparkVersionPrefix + "1.0.0") === "1.0.0") + allCompressionCodecs.foreach { codec => + assert(EventLoggingListener.parseCompressionCodec(compressionCodecPrefix + codec) === codec) + } + } + + /** + * Test whether the special files produced by EventLoggingListener exist. + * + * There should be exactly one event log and one spark version file throughout the entire + * execution. If a compression codec is specified, then the compression codec file should + * also exist. Only after the application has completed does the test expect the application + * completed file to be present. + */ + private def testSpecialFilesExist(compressionCodec: Option[String] = None) { + + def assertFilesExist(logFiles: Array[FileStatus], loggerStopped: Boolean) { + val numCompressionCodecFiles = if (compressionCodec.isDefined) 1 else 0 + val numApplicationCompleteFiles = if (loggerStopped) 1 else 0 + assert(logFiles.size === 2 + numCompressionCodecFiles + numApplicationCompleteFiles) + assert(eventLogsExist(logFiles)) + assert(sparkVersionExists(logFiles)) + assert(compressionCodecExists(logFiles) === compressionCodec.isDefined) + assert(applicationCompleteExists(logFiles) === loggerStopped) + assertSparkVersionIsValid(logFiles) + compressionCodec.foreach { codec => + assertCompressionCodecIsValid(logFiles, codec) + } + } + + // Verify logging directory exists + val conf = getLoggingConf(logDirPath, compressionCodec) + val eventLogger = new EventLoggingListener("test", conf) + eventLogger.start() + val logPath = new Path(eventLogger.logDir) + assert(fileSystem.exists(logPath)) + val logDir = fileSystem.getFileStatus(logPath) + assert(logDir.isDir) + + // Verify special files are as expected before stop() + var logFiles = fileSystem.listStatus(logPath) + assert(logFiles != null) + assertFilesExist(logFiles, loggerStopped = false) + + // Verify special files are as expected after stop() + eventLogger.stop() + logFiles = fileSystem.listStatus(logPath) + assertFilesExist(logFiles, loggerStopped = true) + } + + /** + * Test whether EventLoggingListener correctly parses the correct information from the logs. + * + * This includes whether it returns the correct Spark version, compression codec (if any), + * and the application's completion status. + */ + private def testParsingLogInfo(compressionCodec: Option[String] = None) { + + def assertInfoCorrect(info: EventLoggingInfo, loggerStopped: Boolean) { + assert(info.logPaths.size > 0) + assert(info.sparkVersion === SparkContext.SPARK_VERSION) + assert(info.compressionCodec.isDefined === compressionCodec.isDefined) + info.compressionCodec.foreach { codec => + assert(compressionCodec.isDefined) + val expectedCodec = compressionCodec.get.split('.').last + assert(codec.getClass.getSimpleName === expectedCodec) + } + assert(info.applicationComplete === loggerStopped) + } + + // Verify that all information is correctly parsed before stop() + val conf = getLoggingConf(logDirPath, compressionCodec) + val eventLogger = new EventLoggingListener("test", conf) + eventLogger.start() + var eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) + assertInfoCorrect(eventLoggingInfo, loggerStopped = false) + + // Verify that all information is correctly parsed after stop() + eventLogger.stop() + eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) + assertInfoCorrect(eventLoggingInfo, loggerStopped = true) + } + + /** + * Test basic event logging functionality. + * + * This creates two simple events, posts them to the EventLoggingListener, and verifies that + * exactly these two events are logged in the expected file. + */ + private def testEventLogging(compressionCodec: Option[String] = None) { + val conf = getLoggingConf(logDirPath, compressionCodec) + val eventLogger = new EventLoggingListener("test", conf) + val listenerBus = new LiveListenerBus + val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey") + val applicationEnd = SparkListenerApplicationEnd(1000L) + + // A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite + eventLogger.start() + listenerBus.start() + listenerBus.addListener(eventLogger) + listenerBus.postToAll(applicationStart) + listenerBus.postToAll(applicationEnd) + + // Verify file contains exactly the two events logged + val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) + assert(eventLoggingInfo.logPaths.size > 0) + val lines = readFileLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec) + assert(lines.size === 2) + assert(lines(0).contains("SparkListenerApplicationStart")) + assert(lines(1).contains("SparkListenerApplicationEnd")) + assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === applicationStart) + assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationEnd) + eventLogger.stop() + } + + /** + * Test end-to-end event logging functionality in an application. + * This runs a simple Spark job and asserts that the expected events are logged when expected. + */ + private def testApplicationEventLogging(compressionCodec: Option[String] = None) { + val conf = getLoggingConf(logDirPath, compressionCodec) + val sc = new SparkContext("local", "test", conf) + assert(sc.eventLogger.isDefined) + val eventLogger = sc.eventLogger.get + val expectedLogDir = logDirPath.toString + assert(eventLogger.logDir.startsWith(expectedLogDir)) + + // Begin listening for events that trigger asserts + val eventExistenceListener = new EventExistenceListener(eventLogger) + sc.addSparkListener(eventExistenceListener) + + // Trigger asserts for whether the expected events are actually logged + sc.parallelize(1 to 10000).count() + sc.stop() + + // Ensure all asserts have actually been triggered + eventExistenceListener.assertAllCallbacksInvoked() + } + + /** + * Assert that all of the specified events are logged by the given EventLoggingListener. + */ + private def assertEventsExist(eventLogger: EventLoggingListener, events: Seq[String]) { + val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) + assert(eventLoggingInfo.logPaths.size > 0) + val lines = readFileLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec) + val eventSet = mutable.Set(events: _*) + lines.foreach { line => + eventSet.foreach { event => + if (line.contains(event)) { + val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line)) + val eventType = Utils.getFormattedClassName(parsedEvent) + if (eventType == event) { + eventSet.remove(event) + } + } + } + } + assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq) + } + + /** + * Read all lines from the file specified by the given path. + * If a compression codec is specified, use it to read the file. + */ + private def readFileLines( + filePath: Path, + compressionCodec: Option[CompressionCodec]): Seq[String] = { + val fstream = fileSystem.open(filePath) + val cstream = + compressionCodec.map { codec => + codec.compressedInputStream(fstream) + }.getOrElse(fstream) + Source.fromInputStream(cstream).getLines().toSeq + } + + /** + * A listener that asserts certain events are logged by the given EventLoggingListener. + * This is necessary because events are posted asynchronously in a different thread. + */ + private class EventExistenceListener(eventLogger: EventLoggingListener) extends SparkListener { + var jobStarted = false + var jobEnded = false + var appEnded = false + + override def onJobStart(jobStart: SparkListenerJobStart) { + assertEventsExist(eventLogger, Seq[String]( + Utils.getFormattedClassName(SparkListenerApplicationStart), + Utils.getFormattedClassName(SparkListenerBlockManagerAdded), + Utils.getFormattedClassName(SparkListenerEnvironmentUpdate) + )) + jobStarted = true + } + + override def onJobEnd(jobEnd: SparkListenerJobEnd) { + assertEventsExist(eventLogger, Seq[String]( + Utils.getFormattedClassName(SparkListenerJobStart), + Utils.getFormattedClassName(SparkListenerJobEnd), + Utils.getFormattedClassName(SparkListenerStageSubmitted), + Utils.getFormattedClassName(SparkListenerStageCompleted), + Utils.getFormattedClassName(SparkListenerTaskStart), + Utils.getFormattedClassName(SparkListenerTaskEnd) + )) + jobEnded = true + } + + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { + assertEventsExist(eventLogger, Seq[String]( + Utils.getFormattedClassName(SparkListenerApplicationEnd) + )) + appEnded = true + } + + def assertAllCallbacksInvoked() { + assert(jobStarted, "JobStart callback not invoked!") + assert(jobEnded, "JobEnd callback not invoked!") + assert(appEnded, "ApplicationEnd callback not invoked!") + } + } + + + /* -------------------------------------------------------- * + * Helper methods for validating state of the special files * + * -------------------------------------------------------- */ + + private def eventLogsExist(logFiles: Array[FileStatus]): Boolean = { + logFiles.map(_.getPath.getName).exists(EventLoggingListener.isEventLogFile) + } + + private def sparkVersionExists(logFiles: Array[FileStatus]): Boolean = { + logFiles.map(_.getPath.getName).exists(EventLoggingListener.isSparkVersionFile) + } + + private def compressionCodecExists(logFiles: Array[FileStatus]): Boolean = { + logFiles.map(_.getPath.getName).exists(EventLoggingListener.isCompressionCodecFile) + } + + private def applicationCompleteExists(logFiles: Array[FileStatus]): Boolean = { + logFiles.map(_.getPath.getName).exists(EventLoggingListener.isApplicationCompleteFile) + } + + private def assertSparkVersionIsValid(logFiles: Array[FileStatus]) { + val file = logFiles.map(_.getPath.getName).find(EventLoggingListener.isSparkVersionFile) + assert(file.isDefined) + assert(EventLoggingListener.parseSparkVersion(file.get) === SparkContext.SPARK_VERSION) + } + + private def assertCompressionCodecIsValid(logFiles: Array[FileStatus], compressionCodec: String) { + val file = logFiles.map(_.getPath.getName).find(EventLoggingListener.isCompressionCodecFile) + assert(file.isDefined) + assert(EventLoggingListener.parseCompressionCodec(file.get) === compressionCodec) + } + +} + + +object EventLoggingListenerSuite { + + /** Get a SparkConf with event logging enabled. */ + def getLoggingConf(logDir: Path, compressionCodec: Option[String] = None) = { + val conf = new SparkConf + conf.set("spark.eventLog.enabled", "true") + conf.set("spark.eventLog.testing", "true") + conf.set("spark.eventLog.dir", logDir.toString) + compressionCodec.foreach { codec => + conf.set("spark.eventLog.compress", "true") + conf.set("spark.io.compression.codec", codec) + } + conf + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala new file mode 100644 index 0000000000..d1fe1fc348 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io.PrintWriter + +import scala.util.Try + +import com.google.common.io.Files +import org.json4s.jackson.JsonMethods._ +import org.scalatest.{BeforeAndAfter, FunSuite} + +import org.apache.spark.SparkContext._ +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.{JsonProtocol, Utils} + +/** + * Test whether ReplayListenerBus replays events from logs correctly. + */ +class ReplayListenerSuite extends FunSuite with BeforeAndAfter { + private val fileSystem = Utils.getHadoopFileSystem("/") + private val allCompressionCodecs = Seq[String]( + "org.apache.spark.io.LZFCompressionCodec", + "org.apache.spark.io.SnappyCompressionCodec" + ) + private val testDir = Files.createTempDir() + + after { + Try { fileSystem.delete(Utils.getFilePath(testDir, "events.txt"), true) } + Try { fileSystem.delete(Utils.getFilePath(testDir, "test-replay"), true) } + } + + test("Simple replay") { + testSimpleReplay() + } + + test("Simple replay with compression") { + allCompressionCodecs.foreach { codec => + testSimpleReplay(Some(codec)) + } + } + + // This assumes the correctness of EventLoggingListener + test("End-to-end replay") { + testApplicationReplay() + } + + // This assumes the correctness of EventLoggingListener + test("End-to-end replay with compression") { + allCompressionCodecs.foreach { codec => + testApplicationReplay(Some(codec)) + } + } + + + /* ----------------- * + * Actual test logic * + * ----------------- */ + + /** + * Test simple replaying of events. + */ + private def testSimpleReplay(codecName: Option[String] = None) { + val logFilePath = Utils.getFilePath(testDir, "events.txt") + val codec = codecName.map(getCompressionCodec) + val fstream = fileSystem.create(logFilePath) + val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream) + val writer = new PrintWriter(cstream) + val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey") + val applicationEnd = SparkListenerApplicationEnd(1000L) + writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart)))) + writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd)))) + writer.close() + val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec) + val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath, codecName) + val eventMonster = new EventMonster(conf) + replayer.addListener(eventMonster) + replayer.replay() + assert(eventMonster.loggedEvents.size === 2) + assert(eventMonster.loggedEvents(0) === JsonProtocol.sparkEventToJson(applicationStart)) + assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd)) + } + + /** + * Test end-to-end replaying of events. + * + * This test runs a few simple jobs with event logging enabled, and compares each emitted + * event to the corresponding event replayed from the event logs. This test makes the + * assumption that the event logging behavior is correct (tested in a separate suite). + */ + private def testApplicationReplay(codecName: Option[String] = None) { + val logDirPath = Utils.getFilePath(testDir, "test-replay") + val conf = EventLoggingListenerSuite.getLoggingConf(logDirPath, codecName) + val sc = new SparkContext("local-cluster[2,1,512]", "Test replay", conf) + + // Run a few jobs + sc.parallelize(1 to 100, 1).count() + sc.parallelize(1 to 100, 2).map(i => (i, i)).count() + sc.parallelize(1 to 100, 3).map(i => (i, i)).groupByKey().count() + sc.parallelize(1 to 100, 4).map(i => (i, i)).groupByKey().persist().count() + sc.stop() + + // Prepare information needed for replay + val codec = codecName.map(getCompressionCodec) + val applications = fileSystem.listStatus(logDirPath) + assert(applications != null && applications.size > 0) + val eventLogDir = applications.sortBy(_.getAccessTime).last + assert(eventLogDir.isDir) + val logFiles = fileSystem.listStatus(eventLogDir.getPath) + assert(logFiles != null && logFiles.size > 0) + val logFile = logFiles.find(_.getPath.getName.startsWith("EVENT_LOG_")) + assert(logFile.isDefined) + val logFilePath = logFile.get.getPath + + // Replay events + val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec) + val eventMonster = new EventMonster(conf) + replayer.addListener(eventMonster) + replayer.replay() + + // Verify the same events are replayed in the same order + assert(sc.eventLogger.isDefined) + val originalEvents = sc.eventLogger.get.loggedEvents + val replayedEvents = eventMonster.loggedEvents + originalEvents.zip(replayedEvents).foreach { case (e1, e2) => assert(e1 === e2) } + } + + /** + * A simple listener that buffers all the events it receives. + * + * The event buffering functionality must be implemented within EventLoggingListener itself. + * This is because of the following race condition: the event may be mutated between being + * processed by one listener and being processed by another. Thus, in order to establish + * a fair comparison between the original events and the replayed events, both functionalities + * must be implemented within one listener (i.e. the EventLoggingListener). + * + * This child listener inherits only the event buffering functionality, but does not actually + * log the events. + */ + private class EventMonster(conf: SparkConf) extends EventLoggingListener("test", conf) { + logger.close() + } + + private def getCompressionCodec(codecName: String) = { + val conf = new SparkConf + conf.set("spark.io.compression.codec", codecName) + CompressionCodec.createCodec(conf) + } + +} diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala new file mode 100644 index 0000000000..f675e1e5b4 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.IOException + +import scala.io.Source +import scala.util.Try + +import com.google.common.io.Files +import org.apache.hadoop.fs.Path +import org.scalatest.{BeforeAndAfter, FunSuite} + +import org.apache.spark.SparkConf +import org.apache.spark.io.CompressionCodec + +/** + * Test writing files through the FileLogger. + */ +class FileLoggerSuite extends FunSuite with BeforeAndAfter { + private val fileSystem = Utils.getHadoopFileSystem("/") + private val allCompressionCodecs = Seq[String]( + "org.apache.spark.io.LZFCompressionCodec", + "org.apache.spark.io.SnappyCompressionCodec" + ) + private val testDir = Files.createTempDir() + private val logDirPath = Utils.getFilePath(testDir, "test-file-logger") + private val logDirPathString = logDirPath.toString + + after { + Try { fileSystem.delete(logDirPath, true) } + } + + test("Simple logging") { + testSingleFile() + } + + test ("Simple logging with compression") { + allCompressionCodecs.foreach { codec => + testSingleFile(Some(codec)) + } + } + + test("Logging multiple files") { + testMultipleFiles() + } + + test("Logging multiple files with compression") { + allCompressionCodecs.foreach { codec => + testMultipleFiles(Some(codec)) + } + } + + test("Logging when directory already exists") { + // Create the logging directory multiple times + new FileLogger(logDirPathString, new SparkConf, overwrite = true).start() + new FileLogger(logDirPathString, new SparkConf, overwrite = true).start() + new FileLogger(logDirPathString, new SparkConf, overwrite = true).start() + + // If overwrite is not enabled, an exception should be thrown + intercept[IOException] { + new FileLogger(logDirPathString, new SparkConf, overwrite = false).start() + } + } + + + /* ----------------- * + * Actual test logic * + * ----------------- */ + + /** + * Test logging to a single file. + */ + private def testSingleFile(codecName: Option[String] = None) { + val conf = getLoggingConf(codecName) + val codec = codecName.map { c => CompressionCodec.createCodec(conf) } + val logger = + if (codecName.isDefined) { + new FileLogger(logDirPathString, conf, compress = true) + } else { + new FileLogger(logDirPathString, conf) + } + logger.start() + assert(fileSystem.exists(logDirPath)) + assert(fileSystem.getFileStatus(logDirPath).isDir) + assert(fileSystem.listStatus(logDirPath).size === 0) + + logger.newFile() + val files = fileSystem.listStatus(logDirPath) + assert(files.size === 1) + val firstFile = files.head + val firstFilePath = firstFile.getPath + + logger.log("hello") + logger.flush() + assert(readFileContent(firstFilePath, codec) === "hello") + + logger.log(" world") + logger.close() + assert(readFileContent(firstFilePath, codec) === "hello world") + } + + /** + * Test logging to multiple files. + */ + private def testMultipleFiles(codecName: Option[String] = None) { + val conf = getLoggingConf(codecName) + val codec = codecName.map { c => CompressionCodec.createCodec(conf) } + val logger = + if (codecName.isDefined) { + new FileLogger(logDirPathString, conf, compress = true) + } else { + new FileLogger(logDirPathString, conf) + } + logger.start() + logger.newFile("Jean_Valjean") + logger.logLine("Who am I?") + logger.logLine("Destiny?") + logger.newFile("John_Valjohn") + logger.logLine("One") + logger.logLine("Two three...") + logger.newFile("Wolverine") + logger.logLine("There was a time") + logger.logLine("A time when our enemies knew honor.") + logger.close() + assert(readFileContent(new Path(logDirPath, "Jean_Valjean"), codec) === "Who am I?\nDestiny?") + assert(readFileContent(new Path(logDirPath, "John_Valjohn"), codec) === "One\nTwo three...") + assert(readFileContent(new Path(logDirPath, "Wolverine"), codec) === + "There was a time\nA time when our enemies knew honor.") + } + + /** + * Read the content of the file specified by the given path. + * If a compression codec is specified, use it to read the file. + */ + private def readFileContent(logPath: Path, codec: Option[CompressionCodec] = None): String = { + val fstream = fileSystem.open(logPath) + val cstream = codec.map(_.compressedInputStream(fstream)).getOrElse(fstream) + Source.fromInputStream(cstream).getLines().mkString("\n") + } + + private def getLoggingConf(codecName: Option[String]) = { + val conf = new SparkConf + codecName.foreach { c => conf.set("spark.io.compression.codec", c) } + conf + } + +} |