aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2015-10-27 16:01:26 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-10-27 16:01:26 -0700
commit4f030b9e82172659d250281782ac573cbd1438fc (patch)
tree335d4f235faefc33acf42e6498ceb5be0b7a633d /streaming/src/test
parent9dba5fb2b59174cefde5b62a5c892fe5925bea38 (diff)
downloadspark-4f030b9e82172659d250281782ac573cbd1438fc.tar.gz
spark-4f030b9e82172659d250281782ac573cbd1438fc.tar.bz2
spark-4f030b9e82172659d250281782ac573cbd1438fc.zip
[SPARK-11324][STREAMING] Flag for closing Write Ahead Logs after a write
Currently the Write Ahead Log in Spark Streaming flushes data as writes need to be made. S3 does not support flushing of data, data is written once the stream is actually closed. In case of failure, the data for the last minute (default rolling interval) will not be properly written. Therefore we need a flag to close the stream after the write, so that we achieve read after write consistency. cc tdas zsxwing Author: Burak Yavuz <brkyvz@gmail.com> Closes #9285 from brkyvz/caw-wal.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala32
1 files changed, 25 insertions, 7 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 5e49fd0076..93ae41a3d2 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -203,6 +203,21 @@ class WriteAheadLogSuite extends SparkFunSuite with BeforeAndAfter {
assert(writtenData === dataToWrite)
}
+ test("FileBasedWriteAheadLog - close after write flag") {
+ // Write data with rotation using WriteAheadLog class
+ val numFiles = 3
+ val dataToWrite = Seq.tabulate(numFiles)(_.toString)
+ // total advance time is less than 1000, therefore log shouldn't be rolled, but manually closed
+ writeDataUsingWriteAheadLog(testDir, dataToWrite, closeLog = false, clockAdvanceTime = 100,
+ closeFileAfterWrite = true)
+
+ // Read data manually to verify the written data
+ val logFiles = getLogFilesInDirectory(testDir)
+ assert(logFiles.size === numFiles)
+ val writtenData = logFiles.flatMap { file => readDataManually(file)}
+ assert(writtenData === dataToWrite)
+ }
+
test("FileBasedWriteAheadLog - read rotating logs") {
// Write data manually for testing reading through WriteAheadLog
val writtenData = (1 to 10).map { i =>
@@ -296,8 +311,8 @@ class WriteAheadLogSuite extends SparkFunSuite with BeforeAndAfter {
assert(!nonexistentTempPath.exists())
val writtenSegment = writeDataManually(generateRandomData(), testFile)
- val wal = new FileBasedWriteAheadLog(
- new SparkConf(), tempDir.getAbsolutePath, new Configuration(), 1, 1)
+ val wal = new FileBasedWriteAheadLog(new SparkConf(), tempDir.getAbsolutePath,
+ new Configuration(), 1, 1, closeFileAfterWrite = false)
assert(!nonexistentTempPath.exists(), "Directory created just by creating log object")
wal.read(writtenSegment.head)
assert(!nonexistentTempPath.exists(), "Directory created just by attempting to read segment")
@@ -356,14 +371,16 @@ object WriteAheadLogSuite {
logDirectory: String,
data: Seq[String],
manualClock: ManualClock = new ManualClock,
- closeLog: Boolean = true
- ): FileBasedWriteAheadLog = {
+ closeLog: Boolean = true,
+ clockAdvanceTime: Int = 500,
+ closeFileAfterWrite: Boolean = false): FileBasedWriteAheadLog = {
if (manualClock.getTimeMillis() < 100000) manualClock.setTime(10000)
- val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory, hadoopConf, 1, 1)
+ val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory, hadoopConf, 1, 1,
+ closeFileAfterWrite)
// Ensure that 500 does not get sorted after 2000, so put a high base value.
data.foreach { item =>
- manualClock.advance(500)
+ manualClock.advance(clockAdvanceTime)
wal.write(item, manualClock.getTimeMillis())
}
if (closeLog) wal.close()
@@ -418,7 +435,8 @@ object WriteAheadLogSuite {
/** Read all the data in the log file in a directory using the WriteAheadLog class. */
def readDataUsingWriteAheadLog(logDirectory: String): Seq[String] = {
- val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory, hadoopConf, 1, 1)
+ val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory, hadoopConf, 1, 1,
+ closeFileAfterWrite = false)
val data = wal.readAll().asScala.map(byteBufferToString).toSeq
wal.close()
data