aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorHari Shreedharan <hshreedharan@apache.org>2014-12-31 14:35:07 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-12-31 14:35:07 -0800
commit3610d3c615112faef98d94f04efaea602cc4aa8f (patch)
treedf92a47e1874f6846f9466dddafe7101bebe4e5e /streaming
parentc88a3d7fca20d36ee566d48e0cb91fe33a7a6d99 (diff)
downloadspark-3610d3c615112faef98d94f04efaea602cc4aa8f.tar.gz
spark-3610d3c615112faef98d94f04efaea602cc4aa8f.tar.bz2
spark-3610d3c615112faef98d94f04efaea602cc4aa8f.zip
[SPARK-4790][STREAMING] Fix ReceivedBlockTrackerSuite waits for old file...
...s to get deleted before continuing. Since the deletes are happening asynchronously, the getFileStatus call might throw an exception in older HDFS versions, if the delete happens between the time listFiles is called on the directory and getFileStatus is called on the file in the getFileStatus method. This PR addresses this by adding an option to delete the files synchronously and then waiting for the deletion to complete before proceeding. Author: Hari Shreedharan <hshreedharan@apache.org> Closes #3726 from harishreedharan/spark-4790 and squashes the following commits: bbbacd1 [Hari Shreedharan] Call cleanUpOldLogs only once in the tests. 3255f17 [Hari Shreedharan] Add test for async deletion. Remove method from ReceiverTracker that does not take waitForCompletion. e4c83ec [Hari Shreedharan] Making waitForCompletion a mandatory param. Remove eventually from WALSuite since the cleanup method returns only after all files are deleted. af00fd1 [Hari Shreedharan] [SPARK-4790][STREAMING] Fix ReceivedBlockTrackerSuite waits for old files to get deleted before continuing.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala9
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala17
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala18
7 files changed, 42 insertions, 16 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 8b97db8dd3..f7a8ebee8a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -42,7 +42,7 @@ private[streaming] trait ReceivedBlockHandler {
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): ReceivedBlockStoreResult
/** Cleanup old blocks older than the given threshold time */
- def cleanupOldBlock(threshTime: Long)
+ def cleanupOldBlocks(threshTime: Long)
}
@@ -82,7 +82,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
BlockManagerBasedStoreResult(blockId)
}
- def cleanupOldBlock(threshTime: Long) {
+ def cleanupOldBlocks(threshTime: Long) {
// this is not used as blocks inserted into the BlockManager are cleared by DStream's clearing
// of BlockRDDs.
}
@@ -192,8 +192,8 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
WriteAheadLogBasedStoreResult(blockId, segment)
}
- def cleanupOldBlock(threshTime: Long) {
- logManager.cleanupOldLogs(threshTime)
+ def cleanupOldBlocks(threshTime: Long) {
+ logManager.cleanupOldLogs(threshTime, waitForCompletion = false)
}
def stop() {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 02758e0bca..2ce458cdde 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -139,14 +139,17 @@ private[streaming] class ReceivedBlockTracker(
getReceivedBlockQueue(streamId).toSeq
}
- /** Clean up block information of old batches. */
- def cleanupOldBatches(cleanupThreshTime: Time): Unit = synchronized {
+ /**
+ * Clean up block information of old batches. If waitForCompletion is true, this method
+ * returns only after the files are cleaned up.
+ */
+ def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
assert(cleanupThreshTime.milliseconds < clock.currentTime())
val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
logInfo("Deleting batches " + timesToCleanup)
writeToLog(BatchCleanupEvent(timesToCleanup))
timeToAllocatedBlocks --= timesToCleanup
- logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds))
+ logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds, waitForCompletion))
log
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 1f0e442a12..8dbb42a86e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -121,7 +121,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
/** Clean up metadata older than the given threshold time */
def cleanupOldMetadata(cleanupThreshTime: Time) {
- receivedBlockTracker.cleanupOldBatches(cleanupThreshTime)
+ receivedBlockTracker.cleanupOldBatches(cleanupThreshTime, waitForCompletion = false)
}
/** Register a receiver */
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
index 70d234320b..166661b749 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
@@ -19,11 +19,11 @@ package org.apache.spark.streaming.util
import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.fs.permission.FsPermission
import org.apache.spark.Logging
import org.apache.spark.util.Utils
import WriteAheadLogManager._
@@ -124,8 +124,12 @@ private[streaming] class WriteAheadLogManager(
* files, which is usually based on the local system time. So if there is coordination necessary
* between the node calculating the threshTime (say, driver node), and the local system time
* (say, worker node), the caller has to take account of possible time skew.
+ *
+ * If waitForCompletion is set to true, this method will return only after old logs have been
+ * deleted. This should be set to true only for testing. Else the files will be deleted
+ * asynchronously.
*/
- def cleanupOldLogs(threshTime: Long): Unit = {
+ def cleanupOldLogs(threshTime: Long, waitForCompletion: Boolean): Unit = {
val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
@@ -146,10 +150,15 @@ private[streaming] class WriteAheadLogManager(
logInfo(s"Cleared log files in $logDirectory older than $threshTime")
}
if (!executionContext.isShutdown) {
- Future { deleteFiles() }
+ val f = Future { deleteFiles() }
+ if (waitForCompletion) {
+ import scala.concurrent.duration._
+ Await.ready(f, 1 second)
+ }
}
}
+
/** Stop the manager, close any open log writer */
def stop(): Unit = synchronized {
if (currentLogWriter != null) {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 3661e16a9e..132ff2443f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -168,7 +168,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
manualClock.currentTime() shouldEqual 5000L
val cleanupThreshTime = 3000L
- handler.cleanupOldBlock(cleanupThreshTime)
+ handler.cleanupOldBlocks(cleanupThreshTime)
eventually(timeout(10000 millis), interval(10 millis)) {
getWriteAheadLogFiles().size should be < preCleanupLogFiles.size
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 01a09b67b9..de7e9d624b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -166,7 +166,7 @@ class ReceivedBlockTrackerSuite
// Cleanup first batch but not second batch
val oldestLogFile = getWriteAheadLogFiles().head
incrementTime()
- tracker3.cleanupOldBatches(batchTime2)
+ tracker3.cleanupOldBatches(batchTime2, waitForCompletion = true)
// Verify that the batch allocations have been cleaned, and the act has been written to log
tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual Seq.empty
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 8f69bcb642..7ce9499dc6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -182,15 +182,29 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
}
test("WriteAheadLogManager - cleanup old logs") {
+ logCleanUpTest(waitForCompletion = false)
+ }
+
+ test("WriteAheadLogManager - cleanup old logs synchronously") {
+ logCleanUpTest(waitForCompletion = true)
+ }
+
+ private def logCleanUpTest(waitForCompletion: Boolean): Unit = {
// Write data with manager, recover with new manager and verify
val manualClock = new ManualClock
val dataToWrite = generateRandomData()
manager = writeDataUsingManager(testDir, dataToWrite, manualClock, stopManager = false)
val logFiles = getLogFilesInDirectory(testDir)
assert(logFiles.size > 1)
- manager.cleanupOldLogs(manualClock.currentTime() / 2)
- eventually(timeout(1 second), interval(10 milliseconds)) {
+
+ manager.cleanupOldLogs(manualClock.currentTime() / 2, waitForCompletion)
+
+ if (waitForCompletion) {
assert(getLogFilesInDirectory(testDir).size < logFiles.size)
+ } else {
+ eventually(timeout(1 second), interval(10 milliseconds)) {
+ assert(getLogFilesInDirectory(testDir).size < logFiles.size)
+ }
}
}