aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2015-11-12 18:03:23 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-11-12 18:03:23 -0800
commit7786f9cc0790d27854a1e184f66a9b4df4d040a2 (patch)
tree75c0e6a49514d55d85a3b63dc121d239cd6343bb /streaming
parent0f1d00a905614bb5eebf260566dbcb831158d445 (diff)
downloadspark-7786f9cc0790d27854a1e184f66a9b4df4d040a2.tar.gz
spark-7786f9cc0790d27854a1e184f66a9b4df4d040a2.tar.bz2
spark-7786f9cc0790d27854a1e184f66a9b4df4d040a2.zip
[SPARK-11419][STREAMING] Parallel recovery for FileBasedWriteAheadLog + minor recovery tweaks
The support for closing WriteAheadLog files after writes was just merged in. Closing every file after a write is a very expensive operation as it creates many small files on S3. It's not necessary to enable it on HDFS anyway. However, when you have many small files on S3, recovery takes very long. In addition, files start stacking up pretty quickly, and deletes may not be able to keep up, therefore deletes can also be parallelized. This PR adds support for the two parallelization steps mentioned above, in addition to a couple more failures I encountered during recovery. Author: Burak Yavuz <brkyvz@gmail.com> Closes #9373 from brkyvz/par-recovery.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala78
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala17
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala24
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala91
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala87
7 files changed, 268 insertions, 37 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 2480b4ec09..1ed6fb0aa9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -88,8 +88,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
if (eventLoop == null) return // scheduler has already been stopped
logDebug("Stopping JobScheduler")
- // First, stop receiving
- receiverTracker.stop(processAllReceivedData)
+ if (receiverTracker != null) {
+ // First, stop receiving
+ receiverTracker.stop(processAllReceivedData)
+ }
// Second, stop generating jobs. If it has to process all received data,
// then this will wait for all the processing through JobScheduler to be over.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index bc3f2486c2..72705f1a9c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -17,10 +17,12 @@
package org.apache.spark.streaming.util
import java.nio.ByteBuffer
+import java.util.concurrent.ThreadPoolExecutor
import java.util.{Iterator => JIterator}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
+import scala.collection.parallel.ThreadPoolTaskSupport
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.language.postfixOps
@@ -57,8 +59,8 @@ private[streaming] class FileBasedWriteAheadLog(
private val callerNameTag = getCallerName.map(c => s" for $c").getOrElse("")
private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
- implicit private val executionContext = ExecutionContext.fromExecutorService(
- ThreadUtils.newDaemonSingleThreadExecutor(threadpoolName))
+ private val threadpool = ThreadUtils.newDaemonCachedThreadPool(threadpoolName, 20)
+ private val executionContext = ExecutionContext.fromExecutorService(threadpool)
override protected val logName = s"WriteAheadLogManager $callerNameTag"
private var currentLogPath: Option[String] = None
@@ -124,13 +126,19 @@ private[streaming] class FileBasedWriteAheadLog(
*/
def readAll(): JIterator[ByteBuffer] = synchronized {
val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
- logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
-
- logFilesToRead.iterator.map { file =>
+ logInfo("Reading from the logs:\n" + logFilesToRead.mkString("\n"))
+ def readFile(file: String): Iterator[ByteBuffer] = {
logDebug(s"Creating log reader with $file")
val reader = new FileBasedWriteAheadLogReader(file, hadoopConf)
CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, reader.close _)
- }.flatten.asJava
+ }
+ if (!closeFileAfterWrite) {
+ logFilesToRead.iterator.map(readFile).flatten.asJava
+ } else {
+ // For performance gains, it makes sense to parallelize the recovery if
+ // closeFileAfterWrite = true
+ seqToParIterator(threadpool, logFilesToRead, readFile).asJava
+ }
}
/**
@@ -146,30 +154,33 @@ private[streaming] class FileBasedWriteAheadLog(
* asynchronously.
*/
def clean(threshTime: Long, waitForCompletion: Boolean): Unit = {
- val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
+ val oldLogFiles = synchronized {
+ val expiredLogs = pastLogs.filter { _.endTime < threshTime }
+ pastLogs --= expiredLogs
+ expiredLogs
+ }
logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
- def deleteFiles() {
- oldLogFiles.foreach { logInfo =>
- try {
- val path = new Path(logInfo.path)
- val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
- fs.delete(path, true)
- synchronized { pastLogs -= logInfo }
- logDebug(s"Cleared log file $logInfo")
- } catch {
- case ex: Exception =>
- logWarning(s"Error clearing write ahead log file $logInfo", ex)
- }
+ def deleteFile(walInfo: LogInfo): Unit = {
+ try {
+ val path = new Path(walInfo.path)
+ val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
+ fs.delete(path, true)
+ logDebug(s"Cleared log file $walInfo")
+ } catch {
+ case ex: Exception =>
+ logWarning(s"Error clearing write ahead log file $walInfo", ex)
}
logInfo(s"Cleared log files in $logDirectory older than $threshTime")
}
- if (!executionContext.isShutdown) {
- val f = Future { deleteFiles() }
- if (waitForCompletion) {
- import scala.concurrent.duration._
- Await.ready(f, 1 second)
+ oldLogFiles.foreach { logInfo =>
+ if (!executionContext.isShutdown) {
+ val f = Future { deleteFile(logInfo) }(executionContext)
+ if (waitForCompletion) {
+ import scala.concurrent.duration._
+ Await.ready(f, 1 second)
+ }
}
}
}
@@ -251,4 +262,23 @@ private[streaming] object FileBasedWriteAheadLog {
}
}.sortBy { _.startTime }
}
+
+ /**
+ * This creates an iterator from a parallel collection, by keeping at most `n` objects in memory
+ * at any given time, where `n` is the size of the thread pool. This is crucial for use cases
+ * where we create `FileBasedWriteAheadLogReader`s during parallel recovery. We don't want to
+ * open up `k` streams altogether where `k` is the size of the Seq that we want to parallelize.
+ */
+ def seqToParIterator[I, O](
+ tpool: ThreadPoolExecutor,
+ source: Seq[I],
+ handler: I => Iterator[O]): Iterator[O] = {
+ val taskSupport = new ThreadPoolTaskSupport(tpool)
+ val groupSize = tpool.getMaximumPoolSize.max(8)
+ source.grouped(groupSize).flatMap { group =>
+ val parallelCollection = group.par
+ parallelCollection.tasksupport = taskSupport
+ parallelCollection.map(handler)
+ }.flatten
+ }
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
index f7168229ec..56d4977da0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
@@ -30,7 +30,7 @@ private[streaming] class FileBasedWriteAheadLogRandomReader(path: String, conf:
extends Closeable {
private val instream = HdfsUtils.getInputStream(path, conf)
- private var closed = false
+ private var closed = (instream == null) // the file may be deleted as we're opening the stream
def read(segment: FileBasedWriteAheadLogSegment): ByteBuffer = synchronized {
assertOpen()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
index c3bb59f3fe..a375c07295 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.streaming.util
-import java.io.{Closeable, EOFException}
+import java.io.{IOException, Closeable, EOFException}
import java.nio.ByteBuffer
import org.apache.hadoop.conf.Configuration
@@ -32,7 +32,7 @@ private[streaming] class FileBasedWriteAheadLogReader(path: String, conf: Config
extends Iterator[ByteBuffer] with Closeable with Logging {
private val instream = HdfsUtils.getInputStream(path, conf)
- private var closed = false
+ private var closed = (instream == null) // the file may be deleted as we're opening the stream
private var nextItem: Option[ByteBuffer] = None
override def hasNext: Boolean = synchronized {
@@ -55,6 +55,19 @@ private[streaming] class FileBasedWriteAheadLogReader(path: String, conf: Config
logDebug("Error reading next item, EOF reached", e)
close()
false
+ case e: IOException =>
+ logWarning("Error while trying to read data. If the file was deleted, " +
+ "this should be okay.", e)
+ close()
+ if (HdfsUtils.checkFileExists(path, conf)) {
+ // If file exists, this could be a legitimate error
+ throw e
+ } else {
+ // File was deleted. This can occur when the daemon cleanup thread takes time to
+ // delete the file during recovery.
+ false
+ }
+
case e: Exception =>
logWarning("Error while trying to read data from HDFS.", e)
close()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
index f60688f173..13a765d035 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.streaming.util
+import java.io.IOException
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
@@ -42,8 +44,19 @@ private[streaming] object HdfsUtils {
def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
val dfsPath = new Path(path)
val dfs = getFileSystemForPath(dfsPath, conf)
- val instream = dfs.open(dfsPath)
- instream
+ if (dfs.isFile(dfsPath)) {
+ try {
+ dfs.open(dfsPath)
+ } catch {
+ case e: IOException =>
+ // If we are really unlucky, the file may be deleted as we're opening the stream.
+ // This can happen as clean up is performed by daemon threads that may be left over from
+ // previous runs.
+ if (!dfs.isFile(dfsPath)) null else throw e
+ }
+ } else {
+ null
+ }
}
def checkState(state: Boolean, errorMsg: => String) {
@@ -71,4 +84,11 @@ private[streaming] object HdfsUtils {
case _ => fs
}
}
+
+ /** Check if the file exists at the given path. */
+ def checkFileExists(path: String, conf: Configuration): Boolean = {
+ val hdpPath = new Path(path)
+ val fs = getFileSystemForPath(hdpPath, conf)
+ fs.isFile(hdpPath)
+ }
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index f793a12843..7db17abb79 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming
import java.io.File
+import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
@@ -32,7 +33,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
import org.apache.spark.streaming.scheduler._
-import org.apache.spark.streaming.util.{WriteAheadLogUtils, FileBasedWriteAheadLogReader}
+import org.apache.spark.streaming.util._
import org.apache.spark.streaming.util.WriteAheadLogSuite._
import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
@@ -207,6 +208,75 @@ class ReceivedBlockTrackerSuite
tracker1.isWriteAheadLogEnabled should be (false)
}
+ test("parallel file deletion in FileBasedWriteAheadLog is robust to deletion error") {
+ conf.set("spark.streaming.driver.writeAheadLog.rollingIntervalSecs", "1")
+ require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = true) === 1)
+
+ val addBlocks = generateBlockInfos()
+ val batch1 = addBlocks.slice(0, 1)
+ val batch2 = addBlocks.slice(1, 3)
+ val batch3 = addBlocks.slice(3, addBlocks.length)
+
+ assert(getWriteAheadLogFiles().length === 0)
+
+ // list of timestamps for files
+ val t = Seq.tabulate(5)(i => i * 1000)
+
+ writeEventsManually(getLogFileName(t(0)), Seq(createBatchCleanup(t(0))))
+ assert(getWriteAheadLogFiles().length === 1)
+
+ // The goal is to create several log files which should have been cleaned up.
+ // If we face any issue during recovery, because these old files exist, then we need to make
+ // deletion more robust rather than a parallelized operation where we fire and forget
+ val batch1Allocation = createBatchAllocation(t(1), batch1)
+ writeEventsManually(getLogFileName(t(1)), batch1.map(BlockAdditionEvent) :+ batch1Allocation)
+
+ writeEventsManually(getLogFileName(t(2)), Seq(createBatchCleanup(t(1))))
+
+ val batch2Allocation = createBatchAllocation(t(3), batch2)
+ writeEventsManually(getLogFileName(t(3)), batch2.map(BlockAdditionEvent) :+ batch2Allocation)
+
+ writeEventsManually(getLogFileName(t(4)), batch3.map(BlockAdditionEvent))
+
+ // We should have 5 different log files as we called `writeEventsManually` with 5 different
+ // timestamps
+ assert(getWriteAheadLogFiles().length === 5)
+
+ // Create the tracker to recover from the log files. We're going to ask the tracker to clean
+ // things up, and then we're going to rewrite that data, and recover using a different tracker.
+ // They should have identical data no matter what
+ val tracker = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
+
+ def compareTrackers(base: ReceivedBlockTracker, subject: ReceivedBlockTracker): Unit = {
+ subject.getBlocksOfBatchAndStream(t(3), streamId) should be(
+ base.getBlocksOfBatchAndStream(t(3), streamId))
+ subject.getBlocksOfBatchAndStream(t(1), streamId) should be(
+ base.getBlocksOfBatchAndStream(t(1), streamId))
+ subject.getBlocksOfBatchAndStream(t(0), streamId) should be(Nil)
+ }
+
+ // ask the tracker to clean up some old files
+ tracker.cleanupOldBatches(t(3), waitForCompletion = true)
+ assert(getWriteAheadLogFiles().length === 3)
+
+ val tracker2 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
+ compareTrackers(tracker, tracker2)
+
+ // rewrite first file
+ writeEventsManually(getLogFileName(t(0)), Seq(createBatchCleanup(t(0))))
+ assert(getWriteAheadLogFiles().length === 4)
+ // make sure trackers are consistent
+ val tracker3 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
+ compareTrackers(tracker, tracker3)
+
+ // rewrite second file
+ writeEventsManually(getLogFileName(t(1)), batch1.map(BlockAdditionEvent) :+ batch1Allocation)
+ assert(getWriteAheadLogFiles().length === 5)
+ // make sure trackers are consistent
+ val tracker4 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
+ compareTrackers(tracker, tracker4)
+ }
+
/**
* Create tracker object with the optional provided clock. Use fake clock if you
* want to control time by manually incrementing it to test log clean.
@@ -228,11 +298,30 @@ class ReceivedBlockTrackerSuite
BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L))))
}
+ /**
+ * Write received block tracker events to a file manually.
+ */
+ def writeEventsManually(filePath: String, events: Seq[ReceivedBlockTrackerLogEvent]): Unit = {
+ val writer = HdfsUtils.getOutputStream(filePath, hadoopConf)
+ events.foreach { event =>
+ val bytes = Utils.serialize(event)
+ writer.writeInt(bytes.size)
+ writer.write(bytes)
+ }
+ writer.close()
+ }
+
/** Get all the data written in the given write ahead log file. */
def getWrittenLogData(logFile: String): Seq[ReceivedBlockTrackerLogEvent] = {
getWrittenLogData(Seq(logFile))
}
+ /** Get the log file name for the given log start time. */
+ def getLogFileName(time: Long, rollingIntervalSecs: Int = 1): String = {
+ checkpointDirectory.toString + File.separator + "receivedBlockMetadata" +
+ File.separator + s"log-$time-${time + rollingIntervalSecs * 1000}"
+ }
+
/**
* Get all the data written in the given write ahead log files. By default, it will read all
* files in the test log directory.
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 9e13f25c2e..4273fd7dda 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -19,7 +19,8 @@ package org.apache.spark.streaming.util
import java.io._
import java.nio.ByteBuffer
import java.util.{Iterator => JIterator}
-import java.util.concurrent.ThreadPoolExecutor
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.{TimeUnit, CountDownLatch, ThreadPoolExecutor}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
@@ -32,15 +33,13 @@ import org.apache.hadoop.fs.Path
import org.mockito.Matchers.{eq => meq}
import org.mockito.Matchers._
import org.mockito.Mockito._
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.Eventually._
import org.scalatest.{PrivateMethodTester, BeforeAndAfterEach, BeforeAndAfter}
import org.scalatest.mock.MockitoSugar
import org.apache.spark.streaming.scheduler._
-import org.apache.spark.util.{ThreadUtils, ManualClock, Utils}
+import org.apache.spark.util.{CompletionIterator, ThreadUtils, ManualClock, Utils}
import org.apache.spark.{SparkConf, SparkFunSuite}
/** Common tests for WriteAheadLogs that we would like to test with different configurations. */
@@ -198,6 +197,64 @@ class FileBasedWriteAheadLogSuite
import WriteAheadLogSuite._
+ test("FileBasedWriteAheadLog - seqToParIterator") {
+ /*
+ If the setting `closeFileAfterWrite` is enabled, we start generating a very large number of
+ files. This causes recovery to take a very long time. In order to make it quicker, we
+ parallelized the reading of these files. This test makes sure that we limit the number of
+ open files to the size of the number of threads in our thread pool rather than the size of
+ the list of files.
+ */
+ val numThreads = 8
+ val tpool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "wal-test-thread-pool")
+ class GetMaxCounter {
+ private val value = new AtomicInteger()
+ @volatile private var max: Int = 0
+ def increment(): Unit = synchronized {
+ val atInstant = value.incrementAndGet()
+ if (atInstant > max) max = atInstant
+ }
+ def decrement(): Unit = synchronized { value.decrementAndGet() }
+ def get(): Int = synchronized { value.get() }
+ def getMax(): Int = synchronized { max }
+ }
+ try {
+ // If Jenkins is slow, we may not have a chance to run many threads simultaneously. Having
+ // a latch will make sure that all the threads can be launched altogether.
+ val latch = new CountDownLatch(1)
+ val testSeq = 1 to 1000
+ val counter = new GetMaxCounter()
+ def handle(value: Int): Iterator[Int] = {
+ new CompletionIterator[Int, Iterator[Int]](Iterator(value)) {
+ counter.increment()
+ // block so that other threads also launch
+ latch.await(10, TimeUnit.SECONDS)
+ override def completion() { counter.decrement() }
+ }
+ }
+ @volatile var collected: Seq[Int] = Nil
+ val t = new Thread() {
+ override def run() {
+ // run the calculation on a separate thread so that we can release the latch
+ val iterator = FileBasedWriteAheadLog.seqToParIterator[Int, Int](tpool, testSeq, handle)
+ collected = iterator.toSeq
+ }
+ }
+ t.start()
+ eventually(Eventually.timeout(10.seconds)) {
+ // make sure we are doing a parallel computation!
+ assert(counter.getMax() > 1)
+ }
+ latch.countDown()
+ t.join(10000)
+ assert(collected === testSeq)
+ // make sure we didn't open too many Iterators
+ assert(counter.getMax() <= numThreads)
+ } finally {
+ tpool.shutdownNow()
+ }
+ }
+
test("FileBasedWriteAheadLogWriter - writing data") {
val dataToWrite = generateRandomData()
val segments = writeDataUsingWriter(testFile, dataToWrite)
@@ -259,6 +316,26 @@ class FileBasedWriteAheadLogSuite
assert(readDataUsingReader(testFile) === (dataToWrite.dropRight(1)))
}
+ test("FileBasedWriteAheadLogReader - handles errors when file doesn't exist") {
+ // Write data manually for testing the sequential reader
+ val dataToWrite = generateRandomData()
+ writeDataUsingWriter(testFile, dataToWrite)
+ val tFile = new File(testFile)
+ assert(tFile.exists())
+ // Verify the data can be read and is same as the one correctly written
+ assert(readDataUsingReader(testFile) === dataToWrite)
+
+ tFile.delete()
+ assert(!tFile.exists())
+
+ val reader = new FileBasedWriteAheadLogReader(testFile, hadoopConf)
+ assert(!reader.hasNext)
+ reader.close()
+
+ // Verify that no exception is thrown if file doesn't exist
+ assert(readDataUsingReader(testFile) === Nil)
+ }
+
test("FileBasedWriteAheadLogRandomReader - reading data using random reader") {
// Write data manually for testing the random reader
val writtenData = generateRandomData()
@@ -581,7 +658,7 @@ object WriteAheadLogSuite {
closeFileAfterWrite: Boolean,
allowBatching: Boolean): Seq[String] = {
val wal = createWriteAheadLog(logDirectory, closeFileAfterWrite, allowBatching)
- val data = wal.readAll().asScala.map(byteBufferToString).toSeq
+ val data = wal.readAll().asScala.map(byteBufferToString).toArray
wal.close()
data
}