aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala2
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala24
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala19
5 files changed, 48 insertions, 7 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
index 731a369fc9..7f9e2c9734 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
@@ -67,7 +67,7 @@ private[streaming] object WriteAheadLogUtils extends Logging {
}
def isBatchingEnabled(conf: SparkConf, isDriver: Boolean): Boolean = {
- isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = false)
+ isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = true)
}
/**
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
index 175b8a496b..09b5f8ed03 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
@@ -108,6 +108,7 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog {
public void testCustomWAL() {
SparkConf conf = new SparkConf();
conf.set("spark.streaming.driver.writeAheadLog.class", JavaWriteAheadLogSuite.class.getName());
+ conf.set("spark.streaming.driver.writeAheadLog.allowBatching", "false");
WriteAheadLog wal = WriteAheadLogUtils.createLogForDriver(conf, null, null);
String data1 = "data1";
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 7db17abb79..081f5a1c93 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -330,8 +330,13 @@ class ReceivedBlockTrackerSuite
: Seq[ReceivedBlockTrackerLogEvent] = {
logFiles.flatMap {
file => new FileBasedWriteAheadLogReader(file, hadoopConf).toSeq
- }.map { byteBuffer =>
- Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array)
+ }.flatMap { byteBuffer =>
+ val validBuffer = if (WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = true)) {
+ Utils.deserialize[Array[Array[Byte]]](byteBuffer.array()).map(ByteBuffer.wrap)
+ } else {
+ Array(byteBuffer)
+ }
+ validBuffer.map(b => Utils.deserialize[ReceivedBlockTrackerLogEvent](b.array()))
}.toList
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 4273fd7dda..7f80d6ecdb 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -20,7 +20,7 @@ import java.io._
import java.nio.ByteBuffer
import java.util.{Iterator => JIterator}
import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.{TimeUnit, CountDownLatch, ThreadPoolExecutor}
+import java.util.concurrent.{RejectedExecutionException, TimeUnit, CountDownLatch, ThreadPoolExecutor}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
@@ -190,6 +190,28 @@ abstract class CommonWriteAheadLogTests(
}
assert(!nonexistentTempPath.exists(), "Directory created just by attempting to read segment")
}
+
+ test(testPrefix + "parallel recovery not enabled if closeFileAfterWrite = false") {
+ // write some data
+ val writtenData = (1 to 10).map { i =>
+ val data = generateRandomData()
+ val file = testDir + s"/log-$i-$i"
+ writeDataManually(data, file, allowBatching)
+ data
+ }.flatten
+
+ val wal = createWriteAheadLog(testDir, closeFileAfterWrite, allowBatching)
+ // create iterator but don't materialize it
+ val readData = wal.readAll().asScala.map(byteBufferToString)
+ wal.close()
+ if (closeFileAfterWrite) {
+ // the threadpool is shutdown by the wal.close call above, therefore we shouldn't be able
+ // to materialize the iterator with parallel recovery
+ intercept[RejectedExecutionException](readData.toArray)
+ } else {
+ assert(readData.toSeq === writtenData)
+ }
+ }
}
class FileBasedWriteAheadLogSuite
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala
index 9152728191..bfc5b0cf60 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala
@@ -56,19 +56,19 @@ class WriteAheadLogUtilsSuite extends SparkFunSuite {
test("log selection and creation") {
val emptyConf = new SparkConf() // no log configuration
- assertDriverLogClass[FileBasedWriteAheadLog](emptyConf)
+ assertDriverLogClass[FileBasedWriteAheadLog](emptyConf, isBatched = true)
assertReceiverLogClass[FileBasedWriteAheadLog](emptyConf)
// Verify setting driver WAL class
val driverWALConf = new SparkConf().set("spark.streaming.driver.writeAheadLog.class",
classOf[MockWriteAheadLog0].getName())
- assertDriverLogClass[MockWriteAheadLog0](driverWALConf)
+ assertDriverLogClass[MockWriteAheadLog0](driverWALConf, isBatched = true)
assertReceiverLogClass[FileBasedWriteAheadLog](driverWALConf)
// Verify setting receiver WAL class
val receiverWALConf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.class",
classOf[MockWriteAheadLog0].getName())
- assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf)
+ assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf, isBatched = true)
assertReceiverLogClass[MockWriteAheadLog0](receiverWALConf)
// Verify setting receiver WAL class with 1-arg constructor
@@ -104,6 +104,19 @@ class WriteAheadLogUtilsSuite extends SparkFunSuite {
assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf, isBatched = true)
assertReceiverLogClass[MockWriteAheadLog0](receiverWALConf)
}
+
+ test("batching is enabled by default in WriteAheadLog") {
+ val conf = new SparkConf()
+ assert(WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = true))
+ // batching is not valid for receiver WALs
+ assert(!WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = false))
+ }
+
+ test("closeFileAfterWrite is disabled by default in WriteAheadLog") {
+ val conf = new SparkConf()
+ assert(!WriteAheadLogUtils.shouldCloseFileAfterWrite(conf, isDriver = true))
+ assert(!WriteAheadLogUtils.shouldCloseFileAfterWrite(conf, isDriver = false))
+ }
}
object WriteAheadLogUtilsSuite {