aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorYu Peng <loneknightpy@gmail.com>2016-10-18 13:23:31 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-10-18 13:23:31 -0700
commit231f39e3f6641953a90bc4c40444ede63f363b23 (patch)
tree679bf48c57a3e00196e9f610d1bc0bf386bcd97f /core/src/test
parent37686539f546ac7a3657dbfc59b7ac982b4b9bce (diff)
downloadspark-231f39e3f6641953a90bc4c40444ede63f363b23.tar.gz
spark-231f39e3f6641953a90bc4c40444ede63f363b23.tar.bz2
spark-231f39e3f6641953a90bc4c40444ede63f363b23.zip
[SPARK-17711] Compress rolled executor log
## What changes were proposed in this pull request? This PR adds support for executor log compression. ## How was this patch tested? Unit tests cc: yhuai tdas mengxr Author: Yu Peng <loneknightpy@gmail.com> Closes #15285 from loneknightpy/compress-executor-log.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala60
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala92
3 files changed, 131 insertions, 27 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala
index 72eaffb416..4c3e967779 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ui/LogPageSuite.scala
@@ -22,16 +22,20 @@ import java.io.{File, FileWriter}
import org.mockito.Mockito.{mock, when}
import org.scalatest.PrivateMethodTester
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.worker.Worker
class LogPageSuite extends SparkFunSuite with PrivateMethodTester {
test("get logs simple") {
val webui = mock(classOf[WorkerWebUI])
+ val worker = mock(classOf[Worker])
val tmpDir = new File(sys.props("java.io.tmpdir"))
val workDir = new File(tmpDir, "work-dir")
workDir.mkdir()
when(webui.workDir).thenReturn(workDir)
+ when(webui.worker).thenReturn(worker)
+ when(worker.conf).thenReturn(new SparkConf())
val logPage = new LogPage(webui)
// Prepare some fake log files to read later
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index 4fa9f9a8f5..7e2da8e141 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -20,11 +20,13 @@ package org.apache.spark.util
import java.io._
import java.nio.charset.StandardCharsets
import java.util.concurrent.CountDownLatch
+import java.util.zip.GZIPInputStream
import scala.collection.mutable.HashSet
import scala.reflect._
import com.google.common.io.Files
+import org.apache.commons.io.IOUtils
import org.apache.log4j.{Appender, Level, Logger}
import org.apache.log4j.spi.LoggingEvent
import org.mockito.ArgumentCaptor
@@ -72,6 +74,25 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
testRolling(appender, testOutputStream, textToAppend, rolloverIntervalMillis)
}
+ test("rolling file appender - time-based rolling (compressed)") {
+ // setup input stream and appender
+ val testOutputStream = new PipedOutputStream()
+ val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
+ val rolloverIntervalMillis = 100
+ val durationMillis = 1000
+ val numRollovers = durationMillis / rolloverIntervalMillis
+ val textToAppend = (1 to numRollovers).map( _.toString * 10 )
+
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.executor.logs.rolling.enableCompression", "true")
+ val appender = new RollingFileAppender(testInputStream, testFile,
+ new TimeBasedRollingPolicy(rolloverIntervalMillis, s"--HH-mm-ss-SSSS", false),
+ sparkConf, 10)
+
+ testRolling(
+ appender, testOutputStream, textToAppend, rolloverIntervalMillis, isCompressed = true)
+ }
+
test("rolling file appender - size-based rolling") {
// setup input stream and appender
val testOutputStream = new PipedOutputStream()
@@ -89,6 +110,25 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
}
}
+ test("rolling file appender - size-based rolling (compressed)") {
+ // setup input stream and appender
+ val testOutputStream = new PipedOutputStream()
+ val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
+ val rolloverSize = 1000
+ val textToAppend = (1 to 3).map( _.toString * 1000 )
+
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.executor.logs.rolling.enableCompression", "true")
+ val appender = new RollingFileAppender(testInputStream, testFile,
+ new SizeBasedRollingPolicy(rolloverSize, false), sparkConf, 99)
+
+ val files = testRolling(appender, testOutputStream, textToAppend, 0, isCompressed = true)
+ files.foreach { file =>
+ logInfo(file.toString + ": " + file.length + " bytes")
+ assert(file.length < rolloverSize)
+ }
+ }
+
test("rolling file appender - cleaning") {
// setup input stream and appender
val testOutputStream = new PipedOutputStream()
@@ -273,7 +313,8 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
appender: FileAppender,
outputStream: OutputStream,
textToAppend: Seq[String],
- sleepTimeBetweenTexts: Long
+ sleepTimeBetweenTexts: Long,
+ isCompressed: Boolean = false
): Seq[File] = {
// send data to appender through the input stream, and wait for the data to be written
val expectedText = textToAppend.mkString("")
@@ -290,10 +331,23 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
// verify whether all the data written to rolled over files is same as expected
val generatedFiles = RollingFileAppender.getSortedRolledOverFiles(
testFile.getParentFile.toString, testFile.getName)
- logInfo("Filtered files: \n" + generatedFiles.mkString("\n"))
+ logInfo("Generate files: \n" + generatedFiles.mkString("\n"))
assert(generatedFiles.size > 1)
+ if (isCompressed) {
+ assert(
+ generatedFiles.filter(_.getName.endsWith(RollingFileAppender.GZIP_LOG_SUFFIX)).size > 0)
+ }
val allText = generatedFiles.map { file =>
- Files.toString(file, StandardCharsets.UTF_8)
+ if (file.getName.endsWith(RollingFileAppender.GZIP_LOG_SUFFIX)) {
+ val inputStream = new GZIPInputStream(new FileInputStream(file))
+ try {
+ IOUtils.toString(inputStream, StandardCharsets.UTF_8)
+ } finally {
+ IOUtils.closeQuietly(inputStream)
+ }
+ } else {
+ Files.toString(file, StandardCharsets.UTF_8)
+ }
}.mkString("")
assert(allText === expectedText)
generatedFiles
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index bc28b2d9cb..b427f7fb50 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -25,11 +25,13 @@ import java.nio.charset.StandardCharsets
import java.text.DecimalFormatSymbols
import java.util.Locale
import java.util.concurrent.TimeUnit
+import java.util.zip.GZIPOutputStream
import scala.collection.mutable.ListBuffer
import scala.util.Random
import com.google.common.io.Files
+import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.SystemUtils
import org.apache.commons.math3.stat.inference.ChiSquareTest
import org.apache.hadoop.conf.Configuration
@@ -274,65 +276,109 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(str(10 * hour + 59 * minute + 59 * second + 999) === "11" + sep + "00 h")
}
- test("reading offset bytes of a file") {
+ def getSuffix(isCompressed: Boolean): String = {
+ if (isCompressed) {
+ ".gz"
+ } else {
+ ""
+ }
+ }
+
+ def writeLogFile(path: String, content: Array[Byte]): Unit = {
+ val outputStream = if (path.endsWith(".gz")) {
+ new GZIPOutputStream(new FileOutputStream(path))
+ } else {
+ new FileOutputStream(path)
+ }
+ IOUtils.write(content, outputStream)
+ outputStream.close()
+ content.size
+ }
+
+ private val workerConf = new SparkConf()
+
+ def testOffsetBytes(isCompressed: Boolean): Unit = {
val tmpDir2 = Utils.createTempDir()
- val f1Path = tmpDir2 + "/f1"
- val f1 = new FileOutputStream(f1Path)
- f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(StandardCharsets.UTF_8))
- f1.close()
+ val suffix = getSuffix(isCompressed)
+ val f1Path = tmpDir2 + "/f1" + suffix
+ writeLogFile(f1Path, "1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(StandardCharsets.UTF_8))
+ val f1Length = Utils.getFileLength(new File(f1Path), workerConf)
// Read first few bytes
- assert(Utils.offsetBytes(f1Path, 0, 5) === "1\n2\n3")
+ assert(Utils.offsetBytes(f1Path, f1Length, 0, 5) === "1\n2\n3")
// Read some middle bytes
- assert(Utils.offsetBytes(f1Path, 4, 11) === "3\n4\n5\n6")
+ assert(Utils.offsetBytes(f1Path, f1Length, 4, 11) === "3\n4\n5\n6")
// Read last few bytes
- assert(Utils.offsetBytes(f1Path, 12, 18) === "7\n8\n9\n")
+ assert(Utils.offsetBytes(f1Path, f1Length, 12, 18) === "7\n8\n9\n")
// Read some nonexistent bytes in the beginning
- assert(Utils.offsetBytes(f1Path, -5, 5) === "1\n2\n3")
+ assert(Utils.offsetBytes(f1Path, f1Length, -5, 5) === "1\n2\n3")
// Read some nonexistent bytes at the end
- assert(Utils.offsetBytes(f1Path, 12, 22) === "7\n8\n9\n")
+ assert(Utils.offsetBytes(f1Path, f1Length, 12, 22) === "7\n8\n9\n")
// Read some nonexistent bytes on both ends
- assert(Utils.offsetBytes(f1Path, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n")
+ assert(Utils.offsetBytes(f1Path, f1Length, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n")
Utils.deleteRecursively(tmpDir2)
}
- test("reading offset bytes across multiple files") {
+ test("reading offset bytes of a file") {
+ testOffsetBytes(isCompressed = false)
+ }
+
+ test("reading offset bytes of a file (compressed)") {
+ testOffsetBytes(isCompressed = true)
+ }
+
+ def testOffsetBytesMultipleFiles(isCompressed: Boolean): Unit = {
val tmpDir = Utils.createTempDir()
- val files = (1 to 3).map(i => new File(tmpDir, i.toString))
- Files.write("0123456789", files(0), StandardCharsets.UTF_8)
- Files.write("abcdefghij", files(1), StandardCharsets.UTF_8)
- Files.write("ABCDEFGHIJ", files(2), StandardCharsets.UTF_8)
+ val suffix = getSuffix(isCompressed)
+ val files = (1 to 3).map(i => new File(tmpDir, i.toString + suffix)) :+ new File(tmpDir, "4")
+ writeLogFile(files(0).getAbsolutePath, "0123456789".getBytes(StandardCharsets.UTF_8))
+ writeLogFile(files(1).getAbsolutePath, "abcdefghij".getBytes(StandardCharsets.UTF_8))
+ writeLogFile(files(2).getAbsolutePath, "ABCDEFGHIJ".getBytes(StandardCharsets.UTF_8))
+ writeLogFile(files(3).getAbsolutePath, "9876543210".getBytes(StandardCharsets.UTF_8))
+ val fileLengths = files.map(Utils.getFileLength(_, workerConf))
// Read first few bytes in the 1st file
- assert(Utils.offsetBytes(files, 0, 5) === "01234")
+ assert(Utils.offsetBytes(files, fileLengths, 0, 5) === "01234")
// Read bytes within the 1st file
- assert(Utils.offsetBytes(files, 5, 8) === "567")
+ assert(Utils.offsetBytes(files, fileLengths, 5, 8) === "567")
// Read bytes across 1st and 2nd file
- assert(Utils.offsetBytes(files, 8, 18) === "89abcdefgh")
+ assert(Utils.offsetBytes(files, fileLengths, 8, 18) === "89abcdefgh")
// Read bytes across 1st, 2nd and 3rd file
- assert(Utils.offsetBytes(files, 5, 24) === "56789abcdefghijABCD")
+ assert(Utils.offsetBytes(files, fileLengths, 5, 24) === "56789abcdefghijABCD")
+
+ // Read bytes across 3rd and 4th file
+ assert(Utils.offsetBytes(files, fileLengths, 25, 35) === "FGHIJ98765")
// Read some nonexistent bytes in the beginning
- assert(Utils.offsetBytes(files, -5, 18) === "0123456789abcdefgh")
+ assert(Utils.offsetBytes(files, fileLengths, -5, 18) === "0123456789abcdefgh")
// Read some nonexistent bytes at the end
- assert(Utils.offsetBytes(files, 18, 35) === "ijABCDEFGHIJ")
+ assert(Utils.offsetBytes(files, fileLengths, 18, 45) === "ijABCDEFGHIJ9876543210")
// Read some nonexistent bytes on both ends
- assert(Utils.offsetBytes(files, -5, 35) === "0123456789abcdefghijABCDEFGHIJ")
+ assert(Utils.offsetBytes(files, fileLengths, -5, 45) ===
+ "0123456789abcdefghijABCDEFGHIJ9876543210")
Utils.deleteRecursively(tmpDir)
}
+ test("reading offset bytes across multiple files") {
+ testOffsetBytesMultipleFiles(isCompressed = false)
+ }
+
+ test("reading offset bytes across multiple files (compressed)") {
+ testOffsetBytesMultipleFiles(isCompressed = true)
+ }
+
test("deserialize long value") {
val testval : Long = 9730889947L
val bbuf = ByteBuffer.allocate(8)