aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala225
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala32
4 files changed, 262 insertions, 2 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index bfae32dae0..01ab2d5493 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -28,6 +28,7 @@ import org.scalatest.FunSuite
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, RecoveryState, WorkerInfo}
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
+import org.apache.spark.SparkConf
class JsonProtocolSuite extends FunSuite {
@@ -116,7 +117,8 @@ class JsonProtocolSuite extends FunSuite {
}
def createExecutorRunner(): ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
- new File("sparkHome"), new File("workDir"), "akka://worker", ExecutorState.RUNNING)
+ new File("sparkHome"), new File("workDir"), "akka://worker",
+ new SparkConf, ExecutorState.RUNNING)
}
def createDriverRunner(): DriverRunner = {
new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), createDriverDesc(),
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 8ae387fa0b..e5f748d555 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -22,6 +22,7 @@ import java.io.File
import org.scalatest.FunSuite
import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
+import org.apache.spark.SparkConf
class ExecutorRunnerTest extends FunSuite {
test("command includes appId") {
@@ -32,7 +33,7 @@ class ExecutorRunnerTest extends FunSuite {
sparkHome, "appUiUrl")
val appId = "12345-worker321-9876"
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")),
- f("ooga"), "blah", ExecutorState.RUNNING)
+ f("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
assert(er.getCommandSeq.last === appId)
}
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
new file mode 100644
index 0000000000..53d7f5c607
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io._
+
+import scala.collection.mutable.HashSet
+import scala.reflect._
+
+import org.apache.commons.io.{FileUtils, IOUtils}
+import org.apache.spark.{Logging, SparkConf}
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.apache.spark.util.logging.{RollingFileAppender, SizeBasedRollingPolicy, TimeBasedRollingPolicy, FileAppender}
+
+class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
+
+ val testFile = new File("FileAppenderSuite-test-" + System.currentTimeMillis).getAbsoluteFile
+
+ before {
+ cleanup()
+ }
+
+ after {
+ cleanup()
+ }
+
+ test("basic file appender") {
+ val testString = (1 to 1000).mkString(", ")
+ val inputStream = IOUtils.toInputStream(testString)
+ val appender = new FileAppender(inputStream, testFile)
+ inputStream.close()
+ appender.awaitTermination()
+ assert(FileUtils.readFileToString(testFile) === testString)
+ }
+
+ test("rolling file appender - time-based rolling") {
+ // setup input stream and appender
+ val testOutputStream = new PipedOutputStream()
+ val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
+ val rolloverIntervalMillis = 100
+ val durationMillis = 1000
+ val numRollovers = durationMillis / rolloverIntervalMillis
+ val textToAppend = (1 to numRollovers).map( _.toString * 10 )
+
+ val appender = new RollingFileAppender(testInputStream, testFile,
+ new TimeBasedRollingPolicy(rolloverIntervalMillis, s"--HH-mm-ss-SSSS", false),
+ new SparkConf(), 10)
+
+ testRolling(appender, testOutputStream, textToAppend, rolloverIntervalMillis)
+ }
+
+ test("rolling file appender - size-based rolling") {
+ // setup input stream and appender
+ val testOutputStream = new PipedOutputStream()
+ val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
+ val rolloverSize = 1000
+ val textToAppend = (1 to 3).map( _.toString * 1000 )
+
+ val appender = new RollingFileAppender(testInputStream, testFile,
+ new SizeBasedRollingPolicy(rolloverSize, false), new SparkConf(), 99)
+
+ val files = testRolling(appender, testOutputStream, textToAppend, 0)
+ files.foreach { file =>
+ logInfo(file.toString + ": " + file.length + " bytes")
+ assert(file.length <= rolloverSize)
+ }
+ }
+
+ test("rolling file appender - cleaning") {
+ // setup input stream and appender
+ val testOutputStream = new PipedOutputStream()
+ val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
+ val conf = new SparkConf().set(RollingFileAppender.RETAINED_FILES_PROPERTY, "10")
+ val appender = new RollingFileAppender(testInputStream, testFile,
+ new SizeBasedRollingPolicy(1000, false), conf, 10)
+
+ // send data to appender through the input stream, and wait for the data to be written
+ val allGeneratedFiles = new HashSet[String]()
+ val items = (1 to 10).map { _.toString * 10000 }
+ for (i <- 0 until items.size) {
+ testOutputStream.write(items(i).getBytes("UTF8"))
+ testOutputStream.flush()
+ allGeneratedFiles ++= RollingFileAppender.getSortedRolledOverFiles(
+ testFile.getParentFile.toString, testFile.getName).map(_.toString)
+
+ Thread.sleep(10)
+ }
+ testOutputStream.close()
+ appender.awaitTermination()
+ logInfo("Appender closed")
+
+ // verify whether the earliest file has been deleted
+ val rolledOverFiles = allGeneratedFiles.filter { _ != testFile.toString }.toArray.sorted
+ logInfo(s"All rolled over files generated:${rolledOverFiles.size}\n" + rolledOverFiles.mkString("\n"))
+ assert(rolledOverFiles.size > 2)
+ val earliestRolledOverFile = rolledOverFiles.head
+ val existingRolledOverFiles = RollingFileAppender.getSortedRolledOverFiles(
+ testFile.getParentFile.toString, testFile.getName).map(_.toString)
+ logInfo("Existing rolled over files:\n" + existingRolledOverFiles.mkString("\n"))
+ assert(!existingRolledOverFiles.toSet.contains(earliestRolledOverFile))
+ }
+
+ test("file appender selection") {
+ // Test whether FileAppender.apply() returns the right type of the FileAppender based
+ // on SparkConf settings.
+
+ def testAppenderSelection[ExpectedAppender: ClassTag, ExpectedRollingPolicy](
+ properties: Seq[(String, String)], expectedRollingPolicyParam: Long = -1): FileAppender = {
+
+ // Set spark conf properties
+ val conf = new SparkConf
+ properties.foreach { p =>
+ conf.set(p._1, p._2)
+ }
+
+ // Create and test file appender
+ val inputStream = new PipedInputStream(new PipedOutputStream())
+ val appender = FileAppender(inputStream, new File("stdout"), conf)
+ assert(appender.isInstanceOf[ExpectedAppender])
+ assert(appender.getClass.getSimpleName ===
+ classTag[ExpectedAppender].runtimeClass.getSimpleName)
+ if (appender.isInstanceOf[RollingFileAppender]) {
+ val rollingPolicy = appender.asInstanceOf[RollingFileAppender].rollingPolicy
+ rollingPolicy.isInstanceOf[ExpectedRollingPolicy]
+ val policyParam = if (rollingPolicy.isInstanceOf[TimeBasedRollingPolicy]) {
+ rollingPolicy.asInstanceOf[TimeBasedRollingPolicy].rolloverIntervalMillis
+ } else {
+ rollingPolicy.asInstanceOf[SizeBasedRollingPolicy].rolloverSizeBytes
+ }
+ assert(policyParam === expectedRollingPolicyParam)
+ }
+ appender
+ }
+
+ import RollingFileAppender._
+
+ def rollingStrategy(strategy: String) = Seq(STRATEGY_PROPERTY -> strategy)
+ def rollingSize(size: String) = Seq(SIZE_PROPERTY -> size)
+ def rollingInterval(interval: String) = Seq(INTERVAL_PROPERTY -> interval)
+
+ val msInDay = 24 * 60 * 60 * 1000L
+ val msInHour = 60 * 60 * 1000L
+ val msInMinute = 60 * 1000L
+
+ // test no strategy -> no rolling
+ testAppenderSelection[FileAppender, Any](Seq.empty)
+
+ // test time based rolling strategy
+ testAppenderSelection[RollingFileAppender, Any](rollingStrategy("time"), msInDay)
+ testAppenderSelection[RollingFileAppender, TimeBasedRollingPolicy](
+ rollingStrategy("time") ++ rollingInterval("daily"), msInDay)
+ testAppenderSelection[RollingFileAppender, TimeBasedRollingPolicy](
+ rollingStrategy("time") ++ rollingInterval("hourly"), msInHour)
+ testAppenderSelection[RollingFileAppender, TimeBasedRollingPolicy](
+ rollingStrategy("time") ++ rollingInterval("minutely"), msInMinute)
+ testAppenderSelection[RollingFileAppender, TimeBasedRollingPolicy](
+ rollingStrategy("time") ++ rollingInterval("123456789"), 123456789 * 1000L)
+ testAppenderSelection[FileAppender, Any](
+ rollingStrategy("time") ++ rollingInterval("xyz"))
+
+ // test size based rolling strategy
+ testAppenderSelection[RollingFileAppender, SizeBasedRollingPolicy](
+ rollingStrategy("size") ++ rollingSize("123456789"), 123456789)
+ testAppenderSelection[FileAppender, Any](rollingSize("xyz"))
+
+ // test illegal strategy
+ testAppenderSelection[FileAppender, Any](rollingStrategy("xyz"))
+ }
+
+ /**
+ * Run the rolling file appender with data and see whether all the data was written correctly
+ * across rolled over files.
+ */
+ def testRolling(
+ appender: FileAppender,
+ outputStream: OutputStream,
+ textToAppend: Seq[String],
+ sleepTimeBetweenTexts: Long
+ ): Seq[File] = {
+ // send data to appender through the input stream, and wait for the data to be written
+ val expectedText = textToAppend.mkString("")
+ for (i <- 0 until textToAppend.size) {
+ outputStream.write(textToAppend(i).getBytes("UTF8"))
+ outputStream.flush()
+ Thread.sleep(sleepTimeBetweenTexts)
+ }
+ logInfo("Data sent to appender")
+ outputStream.close()
+ appender.awaitTermination()
+ logInfo("Appender closed")
+
+ // verify whether all the data written to rolled over files is same as expected
+ val generatedFiles = RollingFileAppender.getSortedRolledOverFiles(
+ testFile.getParentFile.toString, testFile.getName)
+ logInfo("Filtered files: \n" + generatedFiles.mkString("\n"))
+ assert(generatedFiles.size > 1)
+ val allText = generatedFiles.map { file =>
+ FileUtils.readFileToString(file)
+ }.mkString("")
+ assert(allText === expectedText)
+ generatedFiles
+ }
+
+ /** Delete all the generated rolledover files */
+ def cleanup() {
+ testFile.getParentFile.listFiles.filter { file =>
+ file.getName.startsWith(testFile.getName)
+ }.foreach { _.delete() }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 0aad882ed7..1ee936bc78 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -140,6 +140,38 @@ class UtilsSuite extends FunSuite {
Utils.deleteRecursively(tmpDir2)
}
+ test("reading offset bytes across multiple files") {
+ val tmpDir = Files.createTempDir()
+ tmpDir.deleteOnExit()
+ val files = (1 to 3).map(i => new File(tmpDir, i.toString))
+ Files.write("0123456789", files(0), Charsets.UTF_8)
+ Files.write("abcdefghij", files(1), Charsets.UTF_8)
+ Files.write("ABCDEFGHIJ", files(2), Charsets.UTF_8)
+
+ // Read first few bytes in the 1st file
+ assert(Utils.offsetBytes(files, 0, 5) === "01234")
+
+ // Read bytes within the 1st file
+ assert(Utils.offsetBytes(files, 5, 8) === "567")
+
+ // Read bytes across 1st and 2nd file
+ assert(Utils.offsetBytes(files, 8, 18) === "89abcdefgh")
+
+ // Read bytes across 1st, 2nd and 3rd file
+ assert(Utils.offsetBytes(files, 5, 24) === "56789abcdefghijABCD")
+
+ // Read some nonexistent bytes in the beginning
+ assert(Utils.offsetBytes(files, -5, 18) === "0123456789abcdefgh")
+
+ // Read some nonexistent bytes at the end
+ assert(Utils.offsetBytes(files, 18, 35) === "ijABCDEFGHIJ")
+
+ // Read some nonexistent bytes on both ends
+ assert(Utils.offsetBytes(files, -5, 35) === "0123456789abcdefghijABCDEFGHIJ")
+
+ Utils.deleteRecursively(tmpDir)
+ }
+
test("deserialize long value") {
val testval : Long = 9730889947L
val bbuf = ByteBuffer.allocate(8)