aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-02-01 11:02:17 -0800
committerAndrew Or <andrew@databricks.com>2016-02-01 11:02:17 -0800
commit6075573a93176ee8c071888e4525043d9e73b061 (patch)
tree45cdc80c2f00b52ac5b5f4aaabb04e3e822557fe /streaming/src/main
parentc1da4d421ab78772ffa52ad46e5bdfb4e5268f47 (diff)
downloadspark-6075573a93176ee8c071888e4525043d9e73b061.tar.gz
spark-6075573a93176ee8c071888e4525043d9e73b061.tar.bz2
spark-6075573a93176ee8c071888e4525043d9e73b061.zip
[SPARK-6847][CORE][STREAMING] Fix stack overflow issue when updateStateByKey is followed by a checkpointed dstream
Add a local property to indicate if checkpointing all RDDs that are marked with the checkpoint flag, and enable it in Streaming Author: Shixiong Zhu <shixiong@databricks.com> Closes #10934 from zsxwing/recursive-checkpoint.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala7
2 files changed, 10 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index a5a01e7763..a3ad5eaa40 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.scheduler
import scala.util.{Failure, Success, Try}
import org.apache.spark.{Logging, SparkEnv}
+import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
import org.apache.spark.streaming.util.RecurringTimer
import org.apache.spark.util.{Clock, EventLoop, ManualClock, Utils}
@@ -243,6 +244,10 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
+
+ // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
+ // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
+ ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 9535c8e5b7..3fed3d8835 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -23,10 +23,10 @@ import scala.collection.JavaConverters._
import scala.util.Failure
import org.apache.spark.Logging
-import org.apache.spark.rdd.PairRDDFunctions
+import org.apache.spark.rdd.{PairRDDFunctions, RDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.ui.UIUtils
-import org.apache.spark.util.{EventLoop, ThreadUtils, Utils}
+import org.apache.spark.util.{EventLoop, ThreadUtils}
private[scheduler] sealed trait JobSchedulerEvent
@@ -210,6 +210,9 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
+ // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
+ // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
+ ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
// We need to assign `eventLoop` to a temp variable. Otherwise, because
// `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then