aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala9
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala17
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala18
7 files changed, 42 insertions, 16 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 8b97db8dd3..f7a8ebee8a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -42,7 +42,7 @@ private[streaming] trait ReceivedBlockHandler {
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): ReceivedBlockStoreResult
/** Cleanup old blocks older than the given threshold time */
- def cleanupOldBlock(threshTime: Long)
+ def cleanupOldBlocks(threshTime: Long)
}
@@ -82,7 +82,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
BlockManagerBasedStoreResult(blockId)
}
- def cleanupOldBlock(threshTime: Long) {
+ def cleanupOldBlocks(threshTime: Long) {
// this is not used as blocks inserted into the BlockManager are cleared by DStream's clearing
// of BlockRDDs.
}
@@ -192,8 +192,8 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
WriteAheadLogBasedStoreResult(blockId, segment)
}
- def cleanupOldBlock(threshTime: Long) {
- logManager.cleanupOldLogs(threshTime)
+ def cleanupOldBlocks(threshTime: Long) {
+ logManager.cleanupOldLogs(threshTime, waitForCompletion = false)
}
def stop() {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 02758e0bca..2ce458cdde 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -139,14 +139,17 @@ private[streaming] class ReceivedBlockTracker(
getReceivedBlockQueue(streamId).toSeq
}
- /** Clean up block information of old batches. */
- def cleanupOldBatches(cleanupThreshTime: Time): Unit = synchronized {
+ /**
+ * Clean up block information of old batches. If waitForCompletion is true, this method
+ * returns only after the files are cleaned up.
+ */
+ def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
assert(cleanupThreshTime.milliseconds < clock.currentTime())
val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
logInfo("Deleting batches " + timesToCleanup)
writeToLog(BatchCleanupEvent(timesToCleanup))
timeToAllocatedBlocks --= timesToCleanup
- logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds))
+ logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds, waitForCompletion))
log
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 1f0e442a12..8dbb42a86e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -121,7 +121,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
/** Clean up metadata older than the given threshold time */
def cleanupOldMetadata(cleanupThreshTime: Time) {
- receivedBlockTracker.cleanupOldBatches(cleanupThreshTime)
+ receivedBlockTracker.cleanupOldBatches(cleanupThreshTime, waitForCompletion = false)
}
/** Register a receiver */
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
index 70d234320b..166661b749 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
@@ -19,11 +19,11 @@ package org.apache.spark.streaming.util
import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.fs.permission.FsPermission
import org.apache.spark.Logging
import org.apache.spark.util.Utils
import WriteAheadLogManager._
@@ -124,8 +124,12 @@ private[streaming] class WriteAheadLogManager(
* files, which is usually based on the local system time. So if there is coordination necessary
* between the node calculating the threshTime (say, driver node), and the local system time
* (say, worker node), the caller has to take account of possible time skew.
+ *
+ * If waitForCompletion is set to true, this method will return only after old logs have been
+ * deleted. This should be set to true only for testing. Else the files will be deleted
+ * asynchronously.
*/
- def cleanupOldLogs(threshTime: Long): Unit = {
+ def cleanupOldLogs(threshTime: Long, waitForCompletion: Boolean): Unit = {
val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
@@ -146,10 +150,15 @@ private[streaming] class WriteAheadLogManager(
logInfo(s"Cleared log files in $logDirectory older than $threshTime")
}
if (!executionContext.isShutdown) {
- Future { deleteFiles() }
+ val f = Future { deleteFiles() }
+ if (waitForCompletion) {
+ import scala.concurrent.duration._
+ Await.ready(f, 1 second)
+ }
}
}
+
/** Stop the manager, close any open log writer */
def stop(): Unit = synchronized {
if (currentLogWriter != null) {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 3661e16a9e..132ff2443f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -168,7 +168,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
manualClock.currentTime() shouldEqual 5000L
val cleanupThreshTime = 3000L
- handler.cleanupOldBlock(cleanupThreshTime)
+ handler.cleanupOldBlocks(cleanupThreshTime)
eventually(timeout(10000 millis), interval(10 millis)) {
getWriteAheadLogFiles().size should be < preCleanupLogFiles.size
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 01a09b67b9..de7e9d624b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -166,7 +166,7 @@ class ReceivedBlockTrackerSuite
// Cleanup first batch but not second batch
val oldestLogFile = getWriteAheadLogFiles().head
incrementTime()
- tracker3.cleanupOldBatches(batchTime2)
+ tracker3.cleanupOldBatches(batchTime2, waitForCompletion = true)
// Verify that the batch allocations have been cleaned, and the act has been written to log
tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual Seq.empty
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 8f69bcb642..7ce9499dc6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -182,15 +182,29 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
}
test("WriteAheadLogManager - cleanup old logs") {
+ logCleanUpTest(waitForCompletion = false)
+ }
+
+ test("WriteAheadLogManager - cleanup old logs synchronously") {
+ logCleanUpTest(waitForCompletion = true)
+ }
+
+ private def logCleanUpTest(waitForCompletion: Boolean): Unit = {
// Write data with manager, recover with new manager and verify
val manualClock = new ManualClock
val dataToWrite = generateRandomData()
manager = writeDataUsingManager(testDir, dataToWrite, manualClock, stopManager = false)
val logFiles = getLogFilesInDirectory(testDir)
assert(logFiles.size > 1)
- manager.cleanupOldLogs(manualClock.currentTime() / 2)
- eventually(timeout(1 second), interval(10 milliseconds)) {
+
+ manager.cleanupOldLogs(manualClock.currentTime() / 2, waitForCompletion)
+
+ if (waitForCompletion) {
assert(getLogFilesInDirectory(testDir).size < logFiles.size)
+ } else {
+ eventually(timeout(1 second), interval(10 milliseconds)) {
+ assert(getLogFilesInDirectory(testDir).size < logFiles.size)
+ }
}
}