aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2015-01-22 21:58:53 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-01-22 21:59:22 -0800
commit5aaf0e0ff5e5082c0064f5f4065cd66a62aa72d6 (patch)
tree8d12d05873c28a9bc691ba459bd0c324886112e5
parent5d07488adc074ff8d5a10980dfc25cd2d33d0cf0 (diff)
downloadspark-5aaf0e0ff5e5082c0064f5f4065cd66a62aa72d6.tar.gz
spark-5aaf0e0ff5e5082c0064f5f4065cd66a62aa72d6.tar.bz2
spark-5aaf0e0ff5e5082c0064f5f4065cd66a62aa72d6.zip
[SPARK-5233][Streaming] Fix error replaying of WAL introduced bug
Because of lacking of `BlockAllocationEvent` in WAL recovery, the dangled event will mix into the new batch, which will lead to the wrong result. Details can be seen in [SPARK-5233](https://issues.apache.org/jira/browse/SPARK-5233). Author: jerryshao <saisai.shao@intel.com> Closes #4032 from jerryshao/SPARK-5233 and squashes the following commits: f0b0c0b [jerryshao] Further address the comments a237c75 [jerryshao] Address the comments e356258 [jerryshao] Fix bug in unit test 558bdc3 [jerryshao] Correctly replay the WAL log when recovering from failure (cherry picked from commit 3c3fa632e6ba45ce536065aa1145698385301fb2) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
-rw-r--r--examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala18
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala12
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala20
4 files changed, 32 insertions, 20 deletions
diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
index c9e1511278..82aeaaf035 100644
--- a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
+++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
@@ -77,7 +77,7 @@ object KafkaWordCountProducer {
val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
- // Zookeper connection properties
+ // Zookeeper connection properties
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index d86f852aba..8632c94349 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -17,12 +17,14 @@
package org.apache.spark.streaming.scheduler
-import akka.actor.{ActorRef, ActorSystem, Props, Actor}
-import org.apache.spark.{SparkException, SparkEnv, Logging}
-import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter}
-import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock}
import scala.util.{Failure, Success, Try}
+import akka.actor.{ActorRef, Props, Actor}
+
+import org.apache.spark.{SparkEnv, Logging}
+import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
+import org.apache.spark.streaming.util.{Clock, ManualClock, RecurringTimer}
+
/** Event classes for JobGenerator */
private[scheduler] sealed trait JobGeneratorEvent
private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent
@@ -206,9 +208,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +
timesToReschedule.mkString(", "))
- timesToReschedule.foreach(time =>
+ timesToReschedule.foreach { time =>
+ // Allocate the related blocks when recovering from failure, because some blocks that were
+ // added but not allocated, are dangling in the queue after recovering, we have to allocate
+ // those blocks to the next batch, which is the batch they were supposed to go.
+ jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))
- )
+ }
// Restart the timer
timer.start(restartTime.milliseconds)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index ef23b5c79f..e19ac939f9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -67,7 +67,7 @@ private[streaming] class ReceivedBlockTracker(
extends Logging {
private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
-
+
private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
private val logManagerOption = createLogManager()
@@ -107,8 +107,14 @@ private[streaming] class ReceivedBlockTracker(
lastAllocatedBatchTime = batchTime
allocatedBlocks
} else {
- throw new SparkException(s"Unexpected allocation of blocks, " +
- s"last batch = $lastAllocatedBatchTime, batch time to allocate = $batchTime ")
+ // This situation occurs when:
+ // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
+ // possibly processed batch job or half-processed batch job need to be processed again,
+ // so the batchTime will be equal to lastAllocatedBatchTime.
+ // 2. Slow checkpointing makes recovered batch time older than WAL recovered
+ // lastAllocatedBatchTime.
+ // This situation will only occurs in recovery time.
+ logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index de7e9d624b..fbb7b0bfeb 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -82,15 +82,15 @@ class ReceivedBlockTrackerSuite
receivedBlockTracker.allocateBlocksToBatch(2)
receivedBlockTracker.getBlocksOfBatchAndStream(2, streamId) shouldBe empty
- // Verify that batch 2 cannot be allocated again
- intercept[SparkException] {
- receivedBlockTracker.allocateBlocksToBatch(2)
- }
+ // Verify that older batches have no operation on batch allocation,
+ // will return the same blocks as previously allocated.
+ receivedBlockTracker.allocateBlocksToBatch(1)
+ receivedBlockTracker.getBlocksOfBatchAndStream(1, streamId) shouldEqual blockInfos
- // Verify that older batches cannot be allocated again
- intercept[SparkException] {
- receivedBlockTracker.allocateBlocksToBatch(1)
- }
+ blockInfos.map(receivedBlockTracker.addBlock)
+ receivedBlockTracker.allocateBlocksToBatch(2)
+ receivedBlockTracker.getBlocksOfBatchAndStream(2, streamId) shouldBe empty
+ receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
}
test("block addition, block to batch allocation and cleanup with write ahead log") {
@@ -186,14 +186,14 @@ class ReceivedBlockTrackerSuite
tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned
tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
}
-
+
test("enabling write ahead log but not setting checkpoint dir") {
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
intercept[SparkException] {
createTracker(setCheckpointDir = false)
}
}
-
+
test("setting checkpoint dir but not enabling write ahead log") {
// When WAL config is not set, log manager should not be enabled
val tracker1 = createTracker(setCheckpointDir = true)