aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-04-29 13:06:11 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-04-29 13:06:11 -0700
commit1868bd40dcce23990b98748b0239bd00452b1ca5 (patch)
treeb350c7f739a52e322f967194814aa433025ecbb4 /streaming/src/test
parentc0c0ba6d2a11febc8e874c437c4f676dd36ae059 (diff)
downloadspark-1868bd40dcce23990b98748b0239bd00452b1ca5.tar.gz
spark-1868bd40dcce23990b98748b0239bd00452b1ca5.tar.bz2
spark-1868bd40dcce23990b98748b0239bd00452b1ca5.zip
[SPARK-7056] [STREAMING] Make the Write Ahead Log pluggable
Users may want the WAL data to be written to non-HDFS data storage systems. To allow that, we have to make the WAL pluggable. The following design doc outlines the plan. https://docs.google.com/a/databricks.com/document/d/1A2XaOLRFzvIZSi18i_luNw5Rmm9j2j4AigktXxIYxmY/edit?usp=sharing Things to add. * Unit tests for WriteAheadLogUtils Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #5645 from tdas/wal-pluggable and squashes the following commits: 2c431fd [Tathagata Das] Minor fixes. c2bc7384 [Tathagata Das] More changes based on PR comments. 569a416 [Tathagata Das] fixed long line bde26b1 [Tathagata Das] Renamed segment to record handle everywhere b65e155 [Tathagata Das] More changes based on PR comments. d7cd15b [Tathagata Das] Fixed test 1a32a4b [Tathagata Das] Fixed test e0d19fb [Tathagata Das] Fixed defaults 9310cbf [Tathagata Das] style fix. 86abcb1 [Tathagata Das] Refactored WriteAheadLogUtils, and consolidated all WAL related configuration into it. 84ce469 [Tathagata Das] Added unit test and fixed compilation error. bce5e75 [Tathagata Das] Fixed long lines. 837c4f5 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into wal-pluggable 754fbf8 [Tathagata Das] Added license and docs. 09bc6fe [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into wal-pluggable 7dd2d4b [Tathagata Das] Added pluggable WriteAheadLog interface, and refactored all code along with it
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java129
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala18
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala28
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala31
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala194
7 files changed, 311 insertions, 95 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
new file mode 100644
index 0000000000..50e8f9fc15
--- /dev/null
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import java.util.ArrayList;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.Transformer;
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.util.WriteAheadLog;
+import org.apache.spark.streaming.util.WriteAheadLogRecordHandle;
+import org.apache.spark.streaming.util.WriteAheadLogUtils;
+
+import org.junit.Test;
+import org.junit.Assert;
+
+class JavaWriteAheadLogSuiteHandle extends WriteAheadLogRecordHandle {
+ int index = -1;
+ public JavaWriteAheadLogSuiteHandle(int idx) {
+ index = idx;
+ }
+}
+
+public class JavaWriteAheadLogSuite extends WriteAheadLog {
+
+ class Record {
+ long time;
+ int index;
+ ByteBuffer buffer;
+
+ public Record(long tym, int idx, ByteBuffer buf) {
+ index = idx;
+ time = tym;
+ buffer = buf;
+ }
+ }
+ private int index = -1;
+ private ArrayList<Record> records = new ArrayList<Record>();
+
+
+ // Methods for WriteAheadLog
+ @Override
+ public WriteAheadLogRecordHandle write(java.nio.ByteBuffer record, long time) {
+ index += 1;
+ records.add(new org.apache.spark.streaming.JavaWriteAheadLogSuite.Record(time, index, record));
+ return new JavaWriteAheadLogSuiteHandle(index);
+ }
+
+ @Override
+ public java.nio.ByteBuffer read(WriteAheadLogRecordHandle handle) {
+ if (handle instanceof JavaWriteAheadLogSuiteHandle) {
+ int reqdIndex = ((JavaWriteAheadLogSuiteHandle) handle).index;
+ for (Record record: records) {
+ if (record.index == reqdIndex) {
+ return record.buffer;
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public java.util.Iterator<java.nio.ByteBuffer> readAll() {
+ Collection<ByteBuffer> buffers = CollectionUtils.collect(records, new Transformer() {
+ @Override
+ public Object transform(Object input) {
+ return ((Record) input).buffer;
+ }
+ });
+ return buffers.iterator();
+ }
+
+ @Override
+ public void clean(long threshTime, boolean waitForCompletion) {
+ for (int i = 0; i < records.size(); i++) {
+ if (records.get(i).time < threshTime) {
+ records.remove(i);
+ i--;
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ records.clear();
+ }
+
+ @Test
+ public void testCustomWAL() {
+ SparkConf conf = new SparkConf();
+ conf.set("spark.streaming.driver.writeAheadLog.class", JavaWriteAheadLogSuite.class.getName());
+ WriteAheadLog wal = WriteAheadLogUtils.createLogForDriver(conf, null, null);
+
+ String data1 = "data1";
+ WriteAheadLogRecordHandle handle = wal.write(ByteBuffer.wrap(data1.getBytes()), 1234);
+ Assert.assertTrue(handle instanceof JavaWriteAheadLogSuiteHandle);
+ Assert.assertTrue(new String(wal.read(handle).array()).equals(data1));
+
+ wal.write(ByteBuffer.wrap("data2".getBytes()), 1235);
+ wal.write(ByteBuffer.wrap("data3".getBytes()), 1236);
+ wal.write(ByteBuffer.wrap("data4".getBytes()), 1237);
+ wal.clean(1236, false);
+
+ java.util.Iterator<java.nio.ByteBuffer> dataIterator = wal.readAll();
+ ArrayList<String> readData = new ArrayList<String>();
+ while (dataIterator.hasNext()) {
+ readData.add(new String(dataIterator.next().array()));
+ }
+ Assert.assertTrue(readData.equals(Arrays.asList("data3", "data4")));
+ }
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index c090eaec29..23804237bd 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -43,7 +43,7 @@ import WriteAheadLogSuite._
class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
- val conf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1")
+ val conf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", "1")
val hadoopConf = new Configuration()
val storageLevel = StorageLevel.MEMORY_ONLY_SER
val streamId = 1
@@ -130,10 +130,13 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
"Unexpected store result type"
)
// Verify the data in write ahead log files is correct
- val fileSegments = storeResults.map { _.asInstanceOf[WriteAheadLogBasedStoreResult].segment}
- val loggedData = fileSegments.flatMap { segment =>
- val reader = new WriteAheadLogRandomReader(segment.path, hadoopConf)
- val bytes = reader.read(segment)
+ val walSegments = storeResults.map { result =>
+ result.asInstanceOf[WriteAheadLogBasedStoreResult].walRecordHandle
+ }
+ val loggedData = walSegments.flatMap { walSegment =>
+ val fileSegment = walSegment.asInstanceOf[FileBasedWriteAheadLogSegment]
+ val reader = new FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf)
+ val bytes = reader.read(fileSegment)
reader.close()
blockManager.dataDeserialize(generateBlockId(), bytes).toList
}
@@ -148,13 +151,13 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
}
}
- test("WriteAheadLogBasedBlockHandler - cleanup old blocks") {
+ test("WriteAheadLogBasedBlockHandler - clean old blocks") {
withWriteAheadLogBasedBlockHandler { handler =>
val blocks = Seq.tabulate(10) { i => IteratorBlock(Iterator(1 to i)) }
storeBlocks(handler, blocks)
val preCleanupLogFiles = getWriteAheadLogFiles()
- preCleanupLogFiles.size should be > 1
+ require(preCleanupLogFiles.size > 1)
// this depends on the number of blocks inserted using generateAndStoreData()
manualClock.getTimeMillis() shouldEqual 5000L
@@ -218,6 +221,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
/** Instantiate a WriteAheadLogBasedBlockHandler and run a code with it */
private def withWriteAheadLogBasedBlockHandler(body: WriteAheadLogBasedBlockHandler => Unit) {
+ require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = false) === 1)
val receivedBlockHandler = new WriteAheadLogBasedBlockHandler(blockManager, 1,
storageLevel, conf, hadoopConf, tempDirectory.toString, manualClock)
try {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index b63b37d9f9..8317fb9720 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -32,7 +32,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
import org.apache.spark.streaming.scheduler._
-import org.apache.spark.streaming.util.WriteAheadLogReader
+import org.apache.spark.streaming.util.{WriteAheadLogUtils, FileBasedWriteAheadLogReader}
import org.apache.spark.streaming.util.WriteAheadLogSuite._
import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
@@ -59,7 +59,7 @@ class ReceivedBlockTrackerSuite
test("block addition, and block to batch allocation") {
val receivedBlockTracker = createTracker(setCheckpointDir = false)
- receivedBlockTracker.isLogManagerEnabled should be (false) // should be disable by default
+ receivedBlockTracker.isWriteAheadLogEnabled should be (false) // should be disable by default
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
val blockInfos = generateBlockInfos()
@@ -88,7 +88,7 @@ class ReceivedBlockTrackerSuite
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
}
- test("block addition, block to batch allocation and cleanup with write ahead log") {
+ test("block addition, block to batch allocation and clean up with write ahead log") {
val manualClock = new ManualClock
// Set the time increment level to twice the rotation interval so that every increment creates
// a new log file
@@ -113,11 +113,15 @@ class ReceivedBlockTrackerSuite
logInfo(s"\n\n=====================\n$message\n$fileContents\n=====================\n")
}
- // Start tracker and add blocks
+ // Set WAL configuration
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
- conf.set("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", "1")
+ conf.set("spark.streaming.driver.writeAheadLog.rollingIntervalSecs", "1")
+ require(WriteAheadLogUtils.enableReceiverLog(conf))
+ require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = true) === 1)
+
+ // Start tracker and add blocks
val tracker1 = createTracker(clock = manualClock)
- tracker1.isLogManagerEnabled should be (true)
+ tracker1.isWriteAheadLogEnabled should be (true)
val blockInfos1 = addBlockInfos(tracker1)
tracker1.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
@@ -171,7 +175,7 @@ class ReceivedBlockTrackerSuite
eventually(timeout(10 seconds), interval(10 millisecond)) {
getWriteAheadLogFiles() should not contain oldestLogFile
}
- printLogFiles("After cleanup")
+ printLogFiles("After clean")
// Restart tracker and verify recovered state, specifically whether info about the first
// batch has been removed, but not the second batch
@@ -192,17 +196,17 @@ class ReceivedBlockTrackerSuite
test("setting checkpoint dir but not enabling write ahead log") {
// When WAL config is not set, log manager should not be enabled
val tracker1 = createTracker(setCheckpointDir = true)
- tracker1.isLogManagerEnabled should be (false)
+ tracker1.isWriteAheadLogEnabled should be (false)
// When WAL is explicitly disabled, log manager should not be enabled
conf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
val tracker2 = createTracker(setCheckpointDir = true)
- tracker2.isLogManagerEnabled should be(false)
+ tracker2.isWriteAheadLogEnabled should be(false)
}
/**
* Create tracker object with the optional provided clock. Use fake clock if you
- * want to control time by manually incrementing it to test log cleanup.
+ * want to control time by manually incrementing it to test log clean.
*/
def createTracker(
setCheckpointDir: Boolean = true,
@@ -231,7 +235,7 @@ class ReceivedBlockTrackerSuite
def getWrittenLogData(logFiles: Seq[String] = getWriteAheadLogFiles)
: Seq[ReceivedBlockTrackerLogEvent] = {
logFiles.flatMap {
- file => new WriteAheadLogReader(file, hadoopConf).toSeq
+ file => new FileBasedWriteAheadLogReader(file, hadoopConf).toSeq
}.map { byteBuffer =>
Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array)
}.toList
@@ -250,7 +254,7 @@ class ReceivedBlockTrackerSuite
BatchAllocationEvent(time, AllocatedBlocks(Map((streamId -> blockInfos))))
}
- /** Create batch cleanup object from the given info */
+ /** Create batch clean object from the given info */
def createBatchCleanup(time: Long, moreTimes: Long*): BatchCleanupEvent = {
BatchCleanupEvent((Seq(time) ++ moreTimes).map(Time.apply))
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index b84129fd70..393a360cfe 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -225,7 +225,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
.setAppName(framework)
.set("spark.ui.enabled", "true")
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
- .set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1")
+ .set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", "1")
val batchDuration = Milliseconds(500)
val tempDirectory = Utils.createTempDir()
val logDirectory1 = new File(checkpointDirToLogDir(tempDirectory.getAbsolutePath, 0))
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 58353a5f97..09440b1e79 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -363,7 +363,7 @@ class TestReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging
}
def onStop() {
- // no cleanup to be done, the receiving thread should stop on it own
+ // no clean to be done, the receiving thread should stop on it own
}
}
@@ -396,7 +396,7 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
def onStop() {
// Simulate slow receiver by waiting for all records to be produced
while(!SlowTestReceiver.receivedAllRecords) Thread.sleep(100)
- // no cleanup to be done, the receiving thread should stop on it own
+ // no clean to be done, the receiving thread should stop on it own
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
index c3602a5b73..8b300d8dd3 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
@@ -21,12 +21,12 @@ import java.io.File
import scala.util.Random
import org.apache.hadoop.conf.Configuration
-import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite}
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
-import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId}
-import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter}
+import org.apache.spark.streaming.util.{FileBasedWriteAheadLogSegment, FileBasedWriteAheadLogWriter}
import org.apache.spark.util.Utils
+import org.apache.spark.{SparkConf, SparkContext}
class WriteAheadLogBackedBlockRDDSuite
extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
@@ -100,9 +100,10 @@ class WriteAheadLogBackedBlockRDDSuite
blockManager.putIterator(blockId, block.iterator, StorageLevel.MEMORY_ONLY_SER)
}
- // Generate write ahead log segments
- val segments = generateFakeSegments(numPartitionsInBM) ++
- writeLogSegments(data.takeRight(numPartitionsInWAL), blockIds.takeRight(numPartitionsInWAL))
+ // Generate write ahead log file segments
+ val recordHandles = generateFakeRecordHandles(numPartitionsInBM) ++
+ generateWALRecordHandles(data.takeRight(numPartitionsInWAL),
+ blockIds.takeRight(numPartitionsInWAL))
// Make sure that the left `numPartitionsInBM` blocks are in block manager, and others are not
require(
@@ -116,24 +117,24 @@ class WriteAheadLogBackedBlockRDDSuite
// Make sure that the right `numPartitionsInWAL` blocks are in WALs, and other are not
require(
- segments.takeRight(numPartitionsInWAL).forall(s =>
+ recordHandles.takeRight(numPartitionsInWAL).forall(s =>
new File(s.path.stripPrefix("file://")).exists()),
"Expected blocks not in write ahead log"
)
require(
- segments.take(numPartitionsInBM).forall(s =>
+ recordHandles.take(numPartitionsInBM).forall(s =>
!new File(s.path.stripPrefix("file://")).exists()),
"Unexpected blocks in write ahead log"
)
// Create the RDD and verify whether the returned data is correct
val rdd = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray,
- segments.toArray, storeInBlockManager = false, StorageLevel.MEMORY_ONLY)
+ recordHandles.toArray, storeInBlockManager = false, StorageLevel.MEMORY_ONLY)
assert(rdd.collect() === data.flatten)
if (testStoreInBM) {
val rdd2 = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray,
- segments.toArray, storeInBlockManager = true, StorageLevel.MEMORY_ONLY)
+ recordHandles.toArray, storeInBlockManager = true, StorageLevel.MEMORY_ONLY)
assert(rdd2.collect() === data.flatten)
assert(
blockIds.forall(blockManager.get(_).nonEmpty),
@@ -142,12 +143,12 @@ class WriteAheadLogBackedBlockRDDSuite
}
}
- private def writeLogSegments(
+ private def generateWALRecordHandles(
blockData: Seq[Seq[String]],
blockIds: Seq[BlockId]
- ): Seq[WriteAheadLogFileSegment] = {
+ ): Seq[FileBasedWriteAheadLogSegment] = {
require(blockData.size === blockIds.size)
- val writer = new WriteAheadLogWriter(new File(dir, "logFile").toString, hadoopConf)
+ val writer = new FileBasedWriteAheadLogWriter(new File(dir, "logFile").toString, hadoopConf)
val segments = blockData.zip(blockIds).map { case (data, id) =>
writer.write(blockManager.dataSerialize(id, data.iterator))
}
@@ -155,7 +156,7 @@ class WriteAheadLogBackedBlockRDDSuite
segments
}
- private def generateFakeSegments(count: Int): Seq[WriteAheadLogFileSegment] = {
- Array.fill(count)(new WriteAheadLogFileSegment("random", 0L, 0))
+ private def generateFakeRecordHandles(count: Int): Seq[FileBasedWriteAheadLogSegment] = {
+ Array.fill(count)(new FileBasedWriteAheadLogSegment("random", 0L, 0))
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index a3919c43b9..79098bcf48 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -18,33 +18,38 @@ package org.apache.spark.streaming.util
import java.io._
import java.nio.ByteBuffer
+import java.util
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.{implicitConversions, postfixOps}
+import scala.reflect.ClassTag
-import WriteAheadLogSuite._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.spark.util.{ManualClock, Utils}
-import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.concurrent.Eventually._
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.spark.util.{ManualClock, Utils}
+import org.apache.spark.{SparkConf, SparkException}
class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
+ import WriteAheadLogSuite._
+
val hadoopConf = new Configuration()
var tempDir: File = null
var testDir: String = null
var testFile: String = null
- var manager: WriteAheadLogManager = null
+ var writeAheadLog: FileBasedWriteAheadLog = null
before {
tempDir = Utils.createTempDir()
testDir = tempDir.toString
testFile = new File(tempDir, "testFile").toString
- if (manager != null) {
- manager.stop()
- manager = null
+ if (writeAheadLog != null) {
+ writeAheadLog.close()
+ writeAheadLog = null
}
}
@@ -52,16 +57,60 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
Utils.deleteRecursively(tempDir)
}
- test("WriteAheadLogWriter - writing data") {
+ test("WriteAheadLogUtils - log selection and creation") {
+ val logDir = Utils.createTempDir().getAbsolutePath()
+
+ def assertDriverLogClass[T <: WriteAheadLog: ClassTag](conf: SparkConf): WriteAheadLog = {
+ val log = WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)
+ assert(log.getClass === implicitly[ClassTag[T]].runtimeClass)
+ log
+ }
+
+ def assertReceiverLogClass[T: ClassTag](conf: SparkConf): WriteAheadLog = {
+ val log = WriteAheadLogUtils.createLogForReceiver(conf, logDir, hadoopConf)
+ assert(log.getClass === implicitly[ClassTag[T]].runtimeClass)
+ log
+ }
+
+ val emptyConf = new SparkConf() // no log configuration
+ assertDriverLogClass[FileBasedWriteAheadLog](emptyConf)
+ assertReceiverLogClass[FileBasedWriteAheadLog](emptyConf)
+
+ // Verify setting driver WAL class
+ val conf1 = new SparkConf().set("spark.streaming.driver.writeAheadLog.class",
+ classOf[MockWriteAheadLog0].getName())
+ assertDriverLogClass[MockWriteAheadLog0](conf1)
+ assertReceiverLogClass[FileBasedWriteAheadLog](conf1)
+
+ // Verify setting receiver WAL class
+ val receiverWALConf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.class",
+ classOf[MockWriteAheadLog0].getName())
+ assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf)
+ assertReceiverLogClass[MockWriteAheadLog0](receiverWALConf)
+
+ // Verify setting receiver WAL class with 1-arg constructor
+ val receiverWALConf2 = new SparkConf().set("spark.streaming.receiver.writeAheadLog.class",
+ classOf[MockWriteAheadLog1].getName())
+ assertReceiverLogClass[MockWriteAheadLog1](receiverWALConf2)
+
+ // Verify failure setting receiver WAL class with 2-arg constructor
+ intercept[SparkException] {
+ val receiverWALConf3 = new SparkConf().set("spark.streaming.receiver.writeAheadLog.class",
+ classOf[MockWriteAheadLog2].getName())
+ assertReceiverLogClass[MockWriteAheadLog1](receiverWALConf3)
+ }
+ }
+
+ test("FileBasedWriteAheadLogWriter - writing data") {
val dataToWrite = generateRandomData()
val segments = writeDataUsingWriter(testFile, dataToWrite)
val writtenData = readDataManually(segments)
assert(writtenData === dataToWrite)
}
- test("WriteAheadLogWriter - syncing of data by writing and reading immediately") {
+ test("FileBasedWriteAheadLogWriter - syncing of data by writing and reading immediately") {
val dataToWrite = generateRandomData()
- val writer = new WriteAheadLogWriter(testFile, hadoopConf)
+ val writer = new FileBasedWriteAheadLogWriter(testFile, hadoopConf)
dataToWrite.foreach { data =>
val segment = writer.write(stringToByteBuffer(data))
val dataRead = readDataManually(Seq(segment)).head
@@ -70,10 +119,10 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
writer.close()
}
- test("WriteAheadLogReader - sequentially reading data") {
+ test("FileBasedWriteAheadLogReader - sequentially reading data") {
val writtenData = generateRandomData()
writeDataManually(writtenData, testFile)
- val reader = new WriteAheadLogReader(testFile, hadoopConf)
+ val reader = new FileBasedWriteAheadLogReader(testFile, hadoopConf)
val readData = reader.toSeq.map(byteBufferToString)
assert(readData === writtenData)
assert(reader.hasNext === false)
@@ -83,14 +132,14 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
reader.close()
}
- test("WriteAheadLogReader - sequentially reading data written with writer") {
+ test("FileBasedWriteAheadLogReader - sequentially reading data written with writer") {
val dataToWrite = generateRandomData()
writeDataUsingWriter(testFile, dataToWrite)
val readData = readDataUsingReader(testFile)
assert(readData === dataToWrite)
}
- test("WriteAheadLogReader - reading data written with writer after corrupted write") {
+ test("FileBasedWriteAheadLogReader - reading data written with writer after corrupted write") {
// Write data manually for testing the sequential reader
val dataToWrite = generateRandomData()
writeDataUsingWriter(testFile, dataToWrite)
@@ -113,38 +162,38 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
assert(readDataUsingReader(testFile) === (dataToWrite.dropRight(1)))
}
- test("WriteAheadLogRandomReader - reading data using random reader") {
+ test("FileBasedWriteAheadLogRandomReader - reading data using random reader") {
// Write data manually for testing the random reader
val writtenData = generateRandomData()
val segments = writeDataManually(writtenData, testFile)
// Get a random order of these segments and read them back
val writtenDataAndSegments = writtenData.zip(segments).toSeq.permutations.take(10).flatten
- val reader = new WriteAheadLogRandomReader(testFile, hadoopConf)
+ val reader = new FileBasedWriteAheadLogRandomReader(testFile, hadoopConf)
writtenDataAndSegments.foreach { case (data, segment) =>
assert(data === byteBufferToString(reader.read(segment)))
}
reader.close()
}
- test("WriteAheadLogRandomReader - reading data using random reader written with writer") {
+ test("FileBasedWriteAheadLogRandomReader- reading data using random reader written with writer") {
// Write data using writer for testing the random reader
val data = generateRandomData()
val segments = writeDataUsingWriter(testFile, data)
// Read a random sequence of segments and verify read data
val dataAndSegments = data.zip(segments).toSeq.permutations.take(10).flatten
- val reader = new WriteAheadLogRandomReader(testFile, hadoopConf)
+ val reader = new FileBasedWriteAheadLogRandomReader(testFile, hadoopConf)
dataAndSegments.foreach { case (data, segment) =>
assert(data === byteBufferToString(reader.read(segment)))
}
reader.close()
}
- test("WriteAheadLogManager - write rotating logs") {
- // Write data using manager
+ test("FileBasedWriteAheadLog - write rotating logs") {
+ // Write data with rotation using WriteAheadLog class
val dataToWrite = generateRandomData()
- writeDataUsingManager(testDir, dataToWrite)
+ writeDataUsingWriteAheadLog(testDir, dataToWrite)
// Read data manually to verify the written data
val logFiles = getLogFilesInDirectory(testDir)
@@ -153,8 +202,8 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
assert(writtenData === dataToWrite)
}
- test("WriteAheadLogManager - read rotating logs") {
- // Write data manually for testing reading through manager
+ test("FileBasedWriteAheadLog - read rotating logs") {
+ // Write data manually for testing reading through WriteAheadLog
val writtenData = (1 to 10).map { i =>
val data = generateRandomData()
val file = testDir + s"/log-$i-$i"
@@ -167,25 +216,25 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
assert(fileSystem.exists(logDirectoryPath) === true)
// Read data using manager and verify
- val readData = readDataUsingManager(testDir)
+ val readData = readDataUsingWriteAheadLog(testDir)
assert(readData === writtenData)
}
- test("WriteAheadLogManager - recover past logs when creating new manager") {
+ test("FileBasedWriteAheadLog - recover past logs when creating new manager") {
// Write data with manager, recover with new manager and verify
val dataToWrite = generateRandomData()
- writeDataUsingManager(testDir, dataToWrite)
+ writeDataUsingWriteAheadLog(testDir, dataToWrite)
val logFiles = getLogFilesInDirectory(testDir)
assert(logFiles.size > 1)
- val readData = readDataUsingManager(testDir)
+ val readData = readDataUsingWriteAheadLog(testDir)
assert(dataToWrite === readData)
}
- test("WriteAheadLogManager - cleanup old logs") {
+ test("FileBasedWriteAheadLog - clean old logs") {
logCleanUpTest(waitForCompletion = false)
}
- test("WriteAheadLogManager - cleanup old logs synchronously") {
+ test("FileBasedWriteAheadLog - clean old logs synchronously") {
logCleanUpTest(waitForCompletion = true)
}
@@ -193,11 +242,11 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
// Write data with manager, recover with new manager and verify
val manualClock = new ManualClock
val dataToWrite = generateRandomData()
- manager = writeDataUsingManager(testDir, dataToWrite, manualClock, stopManager = false)
+ writeAheadLog = writeDataUsingWriteAheadLog(testDir, dataToWrite, manualClock, closeLog = false)
val logFiles = getLogFilesInDirectory(testDir)
assert(logFiles.size > 1)
- manager.cleanupOldLogs(manualClock.getTimeMillis() / 2, waitForCompletion)
+ writeAheadLog.clean(manualClock.getTimeMillis() / 2, waitForCompletion)
if (waitForCompletion) {
assert(getLogFilesInDirectory(testDir).size < logFiles.size)
@@ -208,11 +257,11 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
}
}
- test("WriteAheadLogManager - handling file errors while reading rotating logs") {
+ test("FileBasedWriteAheadLog - handling file errors while reading rotating logs") {
// Generate a set of log files
val manualClock = new ManualClock
val dataToWrite1 = generateRandomData()
- writeDataUsingManager(testDir, dataToWrite1, manualClock)
+ writeDataUsingWriteAheadLog(testDir, dataToWrite1, manualClock)
val logFiles1 = getLogFilesInDirectory(testDir)
assert(logFiles1.size > 1)
@@ -220,12 +269,12 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
// Recover old files and generate a second set of log files
val dataToWrite2 = generateRandomData()
manualClock.advance(100000)
- writeDataUsingManager(testDir, dataToWrite2, manualClock)
+ writeDataUsingWriteAheadLog(testDir, dataToWrite2, manualClock)
val logFiles2 = getLogFilesInDirectory(testDir)
assert(logFiles2.size > logFiles1.size)
// Read the files and verify that all the written data can be read
- val readData1 = readDataUsingManager(testDir)
+ val readData1 = readDataUsingWriteAheadLog(testDir)
assert(readData1 === (dataToWrite1 ++ dataToWrite2))
// Corrupt the first set of files so that they are basically unreadable
@@ -236,25 +285,51 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
}
// Verify that the corrupted files do not prevent reading of the second set of data
- val readData = readDataUsingManager(testDir)
+ val readData = readDataUsingWriteAheadLog(testDir)
assert(readData === dataToWrite2)
}
+
+ test("FileBasedWriteAheadLog - do not create directories or files unless write") {
+ val nonexistentTempPath = File.createTempFile("test", "")
+ nonexistentTempPath.delete()
+ assert(!nonexistentTempPath.exists())
+
+ val writtenSegment = writeDataManually(generateRandomData(), testFile)
+ val wal = new FileBasedWriteAheadLog(
+ new SparkConf(), tempDir.getAbsolutePath, new Configuration(), 1, 1)
+ assert(!nonexistentTempPath.exists(), "Directory created just by creating log object")
+ wal.read(writtenSegment.head)
+ assert(!nonexistentTempPath.exists(), "Directory created just by attempting to read segment")
+ }
}
object WriteAheadLogSuite {
+ class MockWriteAheadLog0() extends WriteAheadLog {
+ override def write(record: ByteBuffer, time: Long): WriteAheadLogRecordHandle = { null }
+ override def read(handle: WriteAheadLogRecordHandle): ByteBuffer = { null }
+ override def readAll(): util.Iterator[ByteBuffer] = { null }
+ override def clean(threshTime: Long, waitForCompletion: Boolean): Unit = { }
+ override def close(): Unit = { }
+ }
+
+ class MockWriteAheadLog1(val conf: SparkConf) extends MockWriteAheadLog0()
+
+ class MockWriteAheadLog2(val conf: SparkConf, x: Int) extends MockWriteAheadLog0()
+
+
private val hadoopConf = new Configuration()
/** Write data to a file directly and return an array of the file segments written. */
- def writeDataManually(data: Seq[String], file: String): Seq[WriteAheadLogFileSegment] = {
- val segments = new ArrayBuffer[WriteAheadLogFileSegment]()
+ def writeDataManually(data: Seq[String], file: String): Seq[FileBasedWriteAheadLogSegment] = {
+ val segments = new ArrayBuffer[FileBasedWriteAheadLogSegment]()
val writer = HdfsUtils.getOutputStream(file, hadoopConf)
data.foreach { item =>
val offset = writer.getPos
val bytes = Utils.serialize(item)
writer.writeInt(bytes.size)
writer.write(bytes)
- segments += WriteAheadLogFileSegment(file, offset, bytes.size)
+ segments += FileBasedWriteAheadLogSegment(file, offset, bytes.size)
}
writer.close()
segments
@@ -263,8 +338,11 @@ object WriteAheadLogSuite {
/**
* Write data to a file using the writer class and return an array of the file segments written.
*/
- def writeDataUsingWriter(filePath: String, data: Seq[String]): Seq[WriteAheadLogFileSegment] = {
- val writer = new WriteAheadLogWriter(filePath, hadoopConf)
+ def writeDataUsingWriter(
+ filePath: String,
+ data: Seq[String]
+ ): Seq[FileBasedWriteAheadLogSegment] = {
+ val writer = new FileBasedWriteAheadLogWriter(filePath, hadoopConf)
val segments = data.map {
item => writer.write(item)
}
@@ -272,27 +350,27 @@ object WriteAheadLogSuite {
segments
}
- /** Write data to rotating files in log directory using the manager class. */
- def writeDataUsingManager(
+ /** Write data to rotating files in log directory using the WriteAheadLog class. */
+ def writeDataUsingWriteAheadLog(
logDirectory: String,
data: Seq[String],
manualClock: ManualClock = new ManualClock,
- stopManager: Boolean = true
- ): WriteAheadLogManager = {
+ closeLog: Boolean = true
+ ): FileBasedWriteAheadLog = {
if (manualClock.getTimeMillis() < 100000) manualClock.setTime(10000)
- val manager = new WriteAheadLogManager(logDirectory, hadoopConf,
- rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = manualClock)
+ val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory, hadoopConf, 1, 1)
+
// Ensure that 500 does not get sorted after 2000, so put a high base value.
data.foreach { item =>
manualClock.advance(500)
- manager.writeToLog(item)
+ wal.write(item, manualClock.getTimeMillis())
}
- if (stopManager) manager.stop()
- manager
+ if (closeLog) wal.close()
+ wal
}
/** Read data from a segments of a log file directly and return the list of byte buffers. */
- def readDataManually(segments: Seq[WriteAheadLogFileSegment]): Seq[String] = {
+ def readDataManually(segments: Seq[FileBasedWriteAheadLogSegment]): Seq[String] = {
segments.map { segment =>
val reader = HdfsUtils.getInputStream(segment.path, hadoopConf)
try {
@@ -331,18 +409,18 @@ object WriteAheadLogSuite {
/** Read all the data from a log file using reader class and return the list of byte buffers. */
def readDataUsingReader(file: String): Seq[String] = {
- val reader = new WriteAheadLogReader(file, hadoopConf)
+ val reader = new FileBasedWriteAheadLogReader(file, hadoopConf)
val readData = reader.toList.map(byteBufferToString)
reader.close()
readData
}
- /** Read all the data in the log file in a directory using the manager class. */
- def readDataUsingManager(logDirectory: String): Seq[String] = {
- val manager = new WriteAheadLogManager(logDirectory, hadoopConf,
- callerName = "WriteAheadLogSuite")
- val data = manager.readFromLog().map(byteBufferToString).toSeq
- manager.stop()
+ /** Read all the data in the log file in a directory using the WriteAheadLog class. */
+ def readDataUsingWriteAheadLog(logDirectory: String): Seq[String] = {
+ import scala.collection.JavaConversions._
+ val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory, hadoopConf, 1, 1)
+ val data = wal.readAll().map(byteBufferToString).toSeq
+ wal.close()
data
}