diff options
Diffstat (limited to 'streaming/src/test/scala')
3 files changed, 46 insertions, 6 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 7db17abb79..081f5a1c93 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -330,8 +330,13 @@ class ReceivedBlockTrackerSuite : Seq[ReceivedBlockTrackerLogEvent] = { logFiles.flatMap { file => new FileBasedWriteAheadLogReader(file, hadoopConf).toSeq - }.map { byteBuffer => - Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array) + }.flatMap { byteBuffer => + val validBuffer = if (WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = true)) { + Utils.deserialize[Array[Array[Byte]]](byteBuffer.array()).map(ByteBuffer.wrap) + } else { + Array(byteBuffer) + } + validBuffer.map(b => Utils.deserialize[ReceivedBlockTrackerLogEvent](b.array())) }.toList } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 4273fd7dda..7f80d6ecdb 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -20,7 +20,7 @@ import java.io._ import java.nio.ByteBuffer import java.util.{Iterator => JIterator} import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.{TimeUnit, CountDownLatch, ThreadPoolExecutor} +import java.util.concurrent.{RejectedExecutionException, TimeUnit, CountDownLatch, ThreadPoolExecutor} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -190,6 +190,28 @@ abstract class CommonWriteAheadLogTests( } assert(!nonexistentTempPath.exists(), "Directory created just by attempting to read segment") } + + test(testPrefix + "parallel recovery not enabled if closeFileAfterWrite = false") { + // write some data + val writtenData = (1 to 10).map { i => + val data = generateRandomData() + val file = testDir + s"/log-$i-$i" + writeDataManually(data, file, allowBatching) + data + }.flatten + + val wal = createWriteAheadLog(testDir, closeFileAfterWrite, allowBatching) + // create iterator but don't materialize it + val readData = wal.readAll().asScala.map(byteBufferToString) + wal.close() + if (closeFileAfterWrite) { + // the threadpool is shutdown by the wal.close call above, therefore we shouldn't be able + // to materialize the iterator with parallel recovery + intercept[RejectedExecutionException](readData.toArray) + } else { + assert(readData.toSeq === writtenData) + } + } } class FileBasedWriteAheadLogSuite diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala index 9152728191..bfc5b0cf60 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala @@ -56,19 +56,19 @@ class WriteAheadLogUtilsSuite extends SparkFunSuite { test("log selection and creation") { val emptyConf = new SparkConf() // no log configuration - assertDriverLogClass[FileBasedWriteAheadLog](emptyConf) + assertDriverLogClass[FileBasedWriteAheadLog](emptyConf, isBatched = true) assertReceiverLogClass[FileBasedWriteAheadLog](emptyConf) // Verify setting driver WAL class val driverWALConf = new SparkConf().set("spark.streaming.driver.writeAheadLog.class", classOf[MockWriteAheadLog0].getName()) - assertDriverLogClass[MockWriteAheadLog0](driverWALConf) + assertDriverLogClass[MockWriteAheadLog0](driverWALConf, isBatched = true) assertReceiverLogClass[FileBasedWriteAheadLog](driverWALConf) // Verify setting receiver WAL class val receiverWALConf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.class", classOf[MockWriteAheadLog0].getName()) - assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf) + assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf, isBatched = true) assertReceiverLogClass[MockWriteAheadLog0](receiverWALConf) // Verify setting receiver WAL class with 1-arg constructor @@ -104,6 +104,19 @@ class WriteAheadLogUtilsSuite extends SparkFunSuite { assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf, isBatched = true) assertReceiverLogClass[MockWriteAheadLog0](receiverWALConf) } + + test("batching is enabled by default in WriteAheadLog") { + val conf = new SparkConf() + assert(WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = true)) + // batching is not valid for receiver WALs + assert(!WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = false)) + } + + test("closeFileAfterWrite is disabled by default in WriteAheadLog") { + val conf = new SparkConf() + assert(!WriteAheadLogUtils.shouldCloseFileAfterWrite(conf, isDriver = true)) + assert(!WriteAheadLogUtils.shouldCloseFileAfterWrite(conf, isDriver = false)) + } } object WriteAheadLogUtilsSuite { |