aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/test/scala')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala24
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala19
3 files changed, 46 insertions, 6 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 7db17abb79..081f5a1c93 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -330,8 +330,13 @@ class ReceivedBlockTrackerSuite
: Seq[ReceivedBlockTrackerLogEvent] = {
logFiles.flatMap {
file => new FileBasedWriteAheadLogReader(file, hadoopConf).toSeq
- }.map { byteBuffer =>
- Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array)
+ }.flatMap { byteBuffer =>
+ val validBuffer = if (WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = true)) {
+ Utils.deserialize[Array[Array[Byte]]](byteBuffer.array()).map(ByteBuffer.wrap)
+ } else {
+ Array(byteBuffer)
+ }
+ validBuffer.map(b => Utils.deserialize[ReceivedBlockTrackerLogEvent](b.array()))
}.toList
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 4273fd7dda..7f80d6ecdb 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -20,7 +20,7 @@ import java.io._
import java.nio.ByteBuffer
import java.util.{Iterator => JIterator}
import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.{TimeUnit, CountDownLatch, ThreadPoolExecutor}
+import java.util.concurrent.{RejectedExecutionException, TimeUnit, CountDownLatch, ThreadPoolExecutor}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
@@ -190,6 +190,28 @@ abstract class CommonWriteAheadLogTests(
}
assert(!nonexistentTempPath.exists(), "Directory created just by attempting to read segment")
}
+
+ test(testPrefix + "parallel recovery not enabled if closeFileAfterWrite = false") {
+ // write some data
+ val writtenData = (1 to 10).map { i =>
+ val data = generateRandomData()
+ val file = testDir + s"/log-$i-$i"
+ writeDataManually(data, file, allowBatching)
+ data
+ }.flatten
+
+ val wal = createWriteAheadLog(testDir, closeFileAfterWrite, allowBatching)
+ // create iterator but don't materialize it
+ val readData = wal.readAll().asScala.map(byteBufferToString)
+ wal.close()
+ if (closeFileAfterWrite) {
+ // the threadpool is shutdown by the wal.close call above, therefore we shouldn't be able
+ // to materialize the iterator with parallel recovery
+ intercept[RejectedExecutionException](readData.toArray)
+ } else {
+ assert(readData.toSeq === writtenData)
+ }
+ }
}
class FileBasedWriteAheadLogSuite
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala
index 9152728191..bfc5b0cf60 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala
@@ -56,19 +56,19 @@ class WriteAheadLogUtilsSuite extends SparkFunSuite {
test("log selection and creation") {
val emptyConf = new SparkConf() // no log configuration
- assertDriverLogClass[FileBasedWriteAheadLog](emptyConf)
+ assertDriverLogClass[FileBasedWriteAheadLog](emptyConf, isBatched = true)
assertReceiverLogClass[FileBasedWriteAheadLog](emptyConf)
// Verify setting driver WAL class
val driverWALConf = new SparkConf().set("spark.streaming.driver.writeAheadLog.class",
classOf[MockWriteAheadLog0].getName())
- assertDriverLogClass[MockWriteAheadLog0](driverWALConf)
+ assertDriverLogClass[MockWriteAheadLog0](driverWALConf, isBatched = true)
assertReceiverLogClass[FileBasedWriteAheadLog](driverWALConf)
// Verify setting receiver WAL class
val receiverWALConf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.class",
classOf[MockWriteAheadLog0].getName())
- assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf)
+ assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf, isBatched = true)
assertReceiverLogClass[MockWriteAheadLog0](receiverWALConf)
// Verify setting receiver WAL class with 1-arg constructor
@@ -104,6 +104,19 @@ class WriteAheadLogUtilsSuite extends SparkFunSuite {
assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf, isBatched = true)
assertReceiverLogClass[MockWriteAheadLog0](receiverWALConf)
}
+
+ test("batching is enabled by default in WriteAheadLog") {
+ val conf = new SparkConf()
+ assert(WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = true))
+ // batching is not valid for receiver WALs
+ assert(!WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = false))
+ }
+
+ test("closeFileAfterWrite is disabled by default in WriteAheadLog") {
+ val conf = new SparkConf()
+ assert(!WriteAheadLogUtils.shouldCloseFileAfterWrite(conf, isDriver = true))
+ assert(!WriteAheadLogUtils.shouldCloseFileAfterWrite(conf, isDriver = false))
+ }
}
object WriteAheadLogUtilsSuite {