aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala78
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala17
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala24
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala91
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala87
7 files changed, 268 insertions, 37 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 2480b4ec09..1ed6fb0aa9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -88,8 +88,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
if (eventLoop == null) return // scheduler has already been stopped
logDebug("Stopping JobScheduler")
- // First, stop receiving
- receiverTracker.stop(processAllReceivedData)
+ if (receiverTracker != null) {
+ // First, stop receiving
+ receiverTracker.stop(processAllReceivedData)
+ }
// Second, stop generating jobs. If it has to process all received data,
// then this will wait for all the processing through JobScheduler to be over.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index bc3f2486c2..72705f1a9c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -17,10 +17,12 @@
package org.apache.spark.streaming.util
import java.nio.ByteBuffer
+import java.util.concurrent.ThreadPoolExecutor
import java.util.{Iterator => JIterator}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
+import scala.collection.parallel.ThreadPoolTaskSupport
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.language.postfixOps
@@ -57,8 +59,8 @@ private[streaming] class FileBasedWriteAheadLog(
private val callerNameTag = getCallerName.map(c => s" for $c").getOrElse("")
private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
- implicit private val executionContext = ExecutionContext.fromExecutorService(
- ThreadUtils.newDaemonSingleThreadExecutor(threadpoolName))
+ private val threadpool = ThreadUtils.newDaemonCachedThreadPool(threadpoolName, 20)
+ private val executionContext = ExecutionContext.fromExecutorService(threadpool)
override protected val logName = s"WriteAheadLogManager $callerNameTag"
private var currentLogPath: Option[String] = None
@@ -124,13 +126,19 @@ private[streaming] class FileBasedWriteAheadLog(
*/
def readAll(): JIterator[ByteBuffer] = synchronized {
val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
- logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
-
- logFilesToRead.iterator.map { file =>
+ logInfo("Reading from the logs:\n" + logFilesToRead.mkString("\n"))
+ def readFile(file: String): Iterator[ByteBuffer] = {
logDebug(s"Creating log reader with $file")
val reader = new FileBasedWriteAheadLogReader(file, hadoopConf)
CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, reader.close _)
- }.flatten.asJava
+ }
+ if (!closeFileAfterWrite) {
+ logFilesToRead.iterator.map(readFile).flatten.asJava
+ } else {
+ // For performance gains, it makes sense to parallelize the recovery if
+ // closeFileAfterWrite = true
+ seqToParIterator(threadpool, logFilesToRead, readFile).asJava
+ }
}
/**
@@ -146,30 +154,33 @@ private[streaming] class FileBasedWriteAheadLog(
* asynchronously.
*/
def clean(threshTime: Long, waitForCompletion: Boolean): Unit = {
- val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
+ val oldLogFiles = synchronized {
+ val expiredLogs = pastLogs.filter { _.endTime < threshTime }
+ pastLogs --= expiredLogs
+ expiredLogs
+ }
logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
- def deleteFiles() {
- oldLogFiles.foreach { logInfo =>
- try {
- val path = new Path(logInfo.path)
- val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
- fs.delete(path, true)
- synchronized { pastLogs -= logInfo }
- logDebug(s"Cleared log file $logInfo")
- } catch {
- case ex: Exception =>
- logWarning(s"Error clearing write ahead log file $logInfo", ex)
- }
+ def deleteFile(walInfo: LogInfo): Unit = {
+ try {
+ val path = new Path(walInfo.path)
+ val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
+ fs.delete(path, true)
+ logDebug(s"Cleared log file $walInfo")
+ } catch {
+ case ex: Exception =>
+ logWarning(s"Error clearing write ahead log file $walInfo", ex)
}
logInfo(s"Cleared log files in $logDirectory older than $threshTime")
}
- if (!executionContext.isShutdown) {
- val f = Future { deleteFiles() }
- if (waitForCompletion) {
- import scala.concurrent.duration._
- Await.ready(f, 1 second)
+ oldLogFiles.foreach { logInfo =>
+ if (!executionContext.isShutdown) {
+ val f = Future { deleteFile(logInfo) }(executionContext)
+ if (waitForCompletion) {
+ import scala.concurrent.duration._
+ Await.ready(f, 1 second)
+ }
}
}
}
@@ -251,4 +262,23 @@ private[streaming] object FileBasedWriteAheadLog {
}
}.sortBy { _.startTime }
}
+
+ /**
+ * This creates an iterator from a parallel collection, by keeping at most `n` objects in memory
+ * at any given time, where `n` is the size of the thread pool. This is crucial for use cases
+ * where we create `FileBasedWriteAheadLogReader`s during parallel recovery. We don't want to
+ * open up `k` streams altogether where `k` is the size of the Seq that we want to parallelize.
+ */
+ def seqToParIterator[I, O](
+ tpool: ThreadPoolExecutor,
+ source: Seq[I],
+ handler: I => Iterator[O]): Iterator[O] = {
+ val taskSupport = new ThreadPoolTaskSupport(tpool)
+ val groupSize = tpool.getMaximumPoolSize.max(8)
+ source.grouped(groupSize).flatMap { group =>
+ val parallelCollection = group.par
+ parallelCollection.tasksupport = taskSupport
+ parallelCollection.map(handler)
+ }.flatten
+ }
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
index f7168229ec..56d4977da0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
@@ -30,7 +30,7 @@ private[streaming] class FileBasedWriteAheadLogRandomReader(path: String, conf:
extends Closeable {
private val instream = HdfsUtils.getInputStream(path, conf)
- private var closed = false
+ private var closed = (instream == null) // the file may be deleted as we're opening the stream
def read(segment: FileBasedWriteAheadLogSegment): ByteBuffer = synchronized {
assertOpen()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
index c3bb59f3fe..a375c07295 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.streaming.util
-import java.io.{Closeable, EOFException}
+import java.io.{IOException, Closeable, EOFException}
import java.nio.ByteBuffer
import org.apache.hadoop.conf.Configuration
@@ -32,7 +32,7 @@ private[streaming] class FileBasedWriteAheadLogReader(path: String, conf: Config
extends Iterator[ByteBuffer] with Closeable with Logging {
private val instream = HdfsUtils.getInputStream(path, conf)
- private var closed = false
+ private var closed = (instream == null) // the file may be deleted as we're opening the stream
private var nextItem: Option[ByteBuffer] = None
override def hasNext: Boolean = synchronized {
@@ -55,6 +55,19 @@ private[streaming] class FileBasedWriteAheadLogReader(path: String, conf: Config
logDebug("Error reading next item, EOF reached", e)
close()
false
+ case e: IOException =>
+ logWarning("Error while trying to read data. If the file was deleted, " +
+ "this should be okay.", e)
+ close()
+ if (HdfsUtils.checkFileExists(path, conf)) {
+ // If file exists, this could be a legitimate error
+ throw e
+ } else {
+ // File was deleted. This can occur when the daemon cleanup thread takes time to
+ // delete the file during recovery.
+ false
+ }
+
case e: Exception =>
logWarning("Error while trying to read data from HDFS.", e)
close()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
index f60688f173..13a765d035 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.streaming.util
+import java.io.IOException
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
@@ -42,8 +44,19 @@ private[streaming] object HdfsUtils {
def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
val dfsPath = new Path(path)
val dfs = getFileSystemForPath(dfsPath, conf)
- val instream = dfs.open(dfsPath)
- instream
+ if (dfs.isFile(dfsPath)) {
+ try {
+ dfs.open(dfsPath)
+ } catch {
+ case e: IOException =>
+ // If we are really unlucky, the file may be deleted as we're opening the stream.
+ // This can happen as clean up is performed by daemon threads that may be left over from
+ // previous runs.
+ if (!dfs.isFile(dfsPath)) null else throw e
+ }
+ } else {
+ null
+ }
}
def checkState(state: Boolean, errorMsg: => String) {
@@ -71,4 +84,11 @@ private[streaming] object HdfsUtils {
case _ => fs
}
}
+
+ /** Check if the file exists at the given path. */
+ def checkFileExists(path: String, conf: Configuration): Boolean = {
+ val hdpPath = new Path(path)
+ val fs = getFileSystemForPath(hdpPath, conf)
+ fs.isFile(hdpPath)
+ }
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index f793a12843..7db17abb79 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming
import java.io.File
+import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
@@ -32,7 +33,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
import org.apache.spark.streaming.scheduler._
-import org.apache.spark.streaming.util.{WriteAheadLogUtils, FileBasedWriteAheadLogReader}
+import org.apache.spark.streaming.util._
import org.apache.spark.streaming.util.WriteAheadLogSuite._
import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
@@ -207,6 +208,75 @@ class ReceivedBlockTrackerSuite
tracker1.isWriteAheadLogEnabled should be (false)
}
+ test("parallel file deletion in FileBasedWriteAheadLog is robust to deletion error") {
+ conf.set("spark.streaming.driver.writeAheadLog.rollingIntervalSecs", "1")
+ require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = true) === 1)
+
+ val addBlocks = generateBlockInfos()
+ val batch1 = addBlocks.slice(0, 1)
+ val batch2 = addBlocks.slice(1, 3)
+ val batch3 = addBlocks.slice(3, addBlocks.length)
+
+ assert(getWriteAheadLogFiles().length === 0)
+
+ // list of timestamps for files
+ val t = Seq.tabulate(5)(i => i * 1000)
+
+ writeEventsManually(getLogFileName(t(0)), Seq(createBatchCleanup(t(0))))
+ assert(getWriteAheadLogFiles().length === 1)
+
+ // The goal is to create several log files which should have been cleaned up.
+ // If we face any issue during recovery, because these old files exist, then we need to make
+ // deletion more robust rather than a parallelized operation where we fire and forget
+ val batch1Allocation = createBatchAllocation(t(1), batch1)
+ writeEventsManually(getLogFileName(t(1)), batch1.map(BlockAdditionEvent) :+ batch1Allocation)
+
+ writeEventsManually(getLogFileName(t(2)), Seq(createBatchCleanup(t(1))))
+
+ val batch2Allocation = createBatchAllocation(t(3), batch2)
+ writeEventsManually(getLogFileName(t(3)), batch2.map(BlockAdditionEvent) :+ batch2Allocation)
+
+ writeEventsManually(getLogFileName(t(4)), batch3.map(BlockAdditionEvent))
+
+ // We should have 5 different log files as we called `writeEventsManually` with 5 different
+ // timestamps
+ assert(getWriteAheadLogFiles().length === 5)
+
+ // Create the tracker to recover from the log files. We're going to ask the tracker to clean
+ // things up, and then we're going to rewrite that data, and recover using a different tracker.
+ // They should have identical data no matter what
+ val tracker = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
+
+ def compareTrackers(base: ReceivedBlockTracker, subject: ReceivedBlockTracker): Unit = {
+ subject.getBlocksOfBatchAndStream(t(3), streamId) should be(
+ base.getBlocksOfBatchAndStream(t(3), streamId))
+ subject.getBlocksOfBatchAndStream(t(1), streamId) should be(
+ base.getBlocksOfBatchAndStream(t(1), streamId))
+ subject.getBlocksOfBatchAndStream(t(0), streamId) should be(Nil)
+ }
+
+ // ask the tracker to clean up some old files
+ tracker.cleanupOldBatches(t(3), waitForCompletion = true)
+ assert(getWriteAheadLogFiles().length === 3)
+
+ val tracker2 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
+ compareTrackers(tracker, tracker2)
+
+ // rewrite first file
+ writeEventsManually(getLogFileName(t(0)), Seq(createBatchCleanup(t(0))))
+ assert(getWriteAheadLogFiles().length === 4)
+ // make sure trackers are consistent
+ val tracker3 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
+ compareTrackers(tracker, tracker3)
+
+ // rewrite second file
+ writeEventsManually(getLogFileName(t(1)), batch1.map(BlockAdditionEvent) :+ batch1Allocation)
+ assert(getWriteAheadLogFiles().length === 5)
+ // make sure trackers are consistent
+ val tracker4 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
+ compareTrackers(tracker, tracker4)
+ }
+
/**
* Create tracker object with the optional provided clock. Use fake clock if you
* want to control time by manually incrementing it to test log clean.
@@ -228,11 +298,30 @@ class ReceivedBlockTrackerSuite
BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L))))
}
+ /**
+ * Write received block tracker events to a file manually.
+ */
+ def writeEventsManually(filePath: String, events: Seq[ReceivedBlockTrackerLogEvent]): Unit = {
+ val writer = HdfsUtils.getOutputStream(filePath, hadoopConf)
+ events.foreach { event =>
+ val bytes = Utils.serialize(event)
+ writer.writeInt(bytes.size)
+ writer.write(bytes)
+ }
+ writer.close()
+ }
+
/** Get all the data written in the given write ahead log file. */
def getWrittenLogData(logFile: String): Seq[ReceivedBlockTrackerLogEvent] = {
getWrittenLogData(Seq(logFile))
}
+ /** Get the log file name for the given log start time. */
+ def getLogFileName(time: Long, rollingIntervalSecs: Int = 1): String = {
+ checkpointDirectory.toString + File.separator + "receivedBlockMetadata" +
+ File.separator + s"log-$time-${time + rollingIntervalSecs * 1000}"
+ }
+
/**
* Get all the data written in the given write ahead log files. By default, it will read all
* files in the test log directory.
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 9e13f25c2e..4273fd7dda 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -19,7 +19,8 @@ package org.apache.spark.streaming.util
import java.io._
import java.nio.ByteBuffer
import java.util.{Iterator => JIterator}
-import java.util.concurrent.ThreadPoolExecutor
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.{TimeUnit, CountDownLatch, ThreadPoolExecutor}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
@@ -32,15 +33,13 @@ import org.apache.hadoop.fs.Path
import org.mockito.Matchers.{eq => meq}
import org.mockito.Matchers._
import org.mockito.Mockito._
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.Eventually._
import org.scalatest.{PrivateMethodTester, BeforeAndAfterEach, BeforeAndAfter}
import org.scalatest.mock.MockitoSugar
import org.apache.spark.streaming.scheduler._
-import org.apache.spark.util.{ThreadUtils, ManualClock, Utils}
+import org.apache.spark.util.{CompletionIterator, ThreadUtils, ManualClock, Utils}
import org.apache.spark.{SparkConf, SparkFunSuite}
/** Common tests for WriteAheadLogs that we would like to test with different configurations. */
@@ -198,6 +197,64 @@ class FileBasedWriteAheadLogSuite
import WriteAheadLogSuite._
+ test("FileBasedWriteAheadLog - seqToParIterator") {
+ /*
+ If the setting `closeFileAfterWrite` is enabled, we start generating a very large number of
+ files. This causes recovery to take a very long time. In order to make it quicker, we
+ parallelized the reading of these files. This test makes sure that we limit the number of
+ open files to the size of the number of threads in our thread pool rather than the size of
+ the list of files.
+ */
+ val numThreads = 8
+ val tpool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "wal-test-thread-pool")
+ class GetMaxCounter {
+ private val value = new AtomicInteger()
+ @volatile private var max: Int = 0
+ def increment(): Unit = synchronized {
+ val atInstant = value.incrementAndGet()
+ if (atInstant > max) max = atInstant
+ }
+ def decrement(): Unit = synchronized { value.decrementAndGet() }
+ def get(): Int = synchronized { value.get() }
+ def getMax(): Int = synchronized { max }
+ }
+ try {
+ // If Jenkins is slow, we may not have a chance to run many threads simultaneously. Having
+ // a latch will make sure that all the threads can be launched altogether.
+ val latch = new CountDownLatch(1)
+ val testSeq = 1 to 1000
+ val counter = new GetMaxCounter()
+ def handle(value: Int): Iterator[Int] = {
+ new CompletionIterator[Int, Iterator[Int]](Iterator(value)) {
+ counter.increment()
+ // block so that other threads also launch
+ latch.await(10, TimeUnit.SECONDS)
+ override def completion() { counter.decrement() }
+ }
+ }
+ @volatile var collected: Seq[Int] = Nil
+ val t = new Thread() {
+ override def run() {
+ // run the calculation on a separate thread so that we can release the latch
+ val iterator = FileBasedWriteAheadLog.seqToParIterator[Int, Int](tpool, testSeq, handle)
+ collected = iterator.toSeq
+ }
+ }
+ t.start()
+ eventually(Eventually.timeout(10.seconds)) {
+ // make sure we are doing a parallel computation!
+ assert(counter.getMax() > 1)
+ }
+ latch.countDown()
+ t.join(10000)
+ assert(collected === testSeq)
+ // make sure we didn't open too many Iterators
+ assert(counter.getMax() <= numThreads)
+ } finally {
+ tpool.shutdownNow()
+ }
+ }
+
test("FileBasedWriteAheadLogWriter - writing data") {
val dataToWrite = generateRandomData()
val segments = writeDataUsingWriter(testFile, dataToWrite)
@@ -259,6 +316,26 @@ class FileBasedWriteAheadLogSuite
assert(readDataUsingReader(testFile) === (dataToWrite.dropRight(1)))
}
+ test("FileBasedWriteAheadLogReader - handles errors when file doesn't exist") {
+ // Write data manually for testing the sequential reader
+ val dataToWrite = generateRandomData()
+ writeDataUsingWriter(testFile, dataToWrite)
+ val tFile = new File(testFile)
+ assert(tFile.exists())
+ // Verify the data can be read and is same as the one correctly written
+ assert(readDataUsingReader(testFile) === dataToWrite)
+
+ tFile.delete()
+ assert(!tFile.exists())
+
+ val reader = new FileBasedWriteAheadLogReader(testFile, hadoopConf)
+ assert(!reader.hasNext)
+ reader.close()
+
+ // Verify that no exception is thrown if file doesn't exist
+ assert(readDataUsingReader(testFile) === Nil)
+ }
+
test("FileBasedWriteAheadLogRandomReader - reading data using random reader") {
// Write data manually for testing the random reader
val writtenData = generateRandomData()
@@ -581,7 +658,7 @@ object WriteAheadLogSuite {
closeFileAfterWrite: Boolean,
allowBatching: Boolean): Seq[String] = {
val wal = createWriteAheadLog(logDirectory, closeFileAfterWrite, allowBatching)
- val data = wal.readAll().asScala.map(byteBufferToString).toSeq
+ val data = wal.readAll().asScala.map(byteBufferToString).toArray
wal.close()
data
}