aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-11-24 23:13:01 -0800
committerShixiong Zhu <shixiong@databricks.com>2015-11-24 23:13:01 -0800
commit2169886883d33b33acf378ac42a626576b342df1 (patch)
treec1b94aab923d9d5d605f940dc489a19518249534 /streaming/src/main
parent151d7c2baf18403e6e59e97c80c8bcded6148038 (diff)
downloadspark-2169886883d33b33acf378ac42a626576b342df1.tar.gz
spark-2169886883d33b33acf378ac42a626576b342df1.tar.bz2
spark-2169886883d33b33acf378ac42a626576b342df1.zip
[SPARK-11979][STREAMING] Empty TrackStateRDD cannot be checkpointed and recovered from checkpoint file
This solves the following exception caused when empty state RDD is checkpointed and recovered. The root cause is that an empty OpenHashMapBasedStateMap cannot be deserialized as the initialCapacity is set to zero. ``` Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 20, localhost): java.lang.IllegalArgumentException: requirement failed: Invalid initial capacity at scala.Predef$.require(Predef.scala:233) at org.apache.spark.streaming.util.OpenHashMapBasedStateMap.<init>(StateMap.scala:96) at org.apache.spark.streaming.util.OpenHashMapBasedStateMap.<init>(StateMap.scala:86) at org.apache.spark.streaming.util.OpenHashMapBasedStateMap.readObject(StateMap.scala:291) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:181) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:921) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:921) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) ``` Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #9958 from tdas/SPARK-11979.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala19
1 files changed, 12 insertions, 7 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala
index 34287c3e00..3f139ad138 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala
@@ -59,7 +59,7 @@ private[streaming] object StateMap {
def create[K: ClassTag, S: ClassTag](conf: SparkConf): StateMap[K, S] = {
val deltaChainThreshold = conf.getInt("spark.streaming.sessionByKey.deltaChainThreshold",
DELTA_CHAIN_LENGTH_THRESHOLD)
- new OpenHashMapBasedStateMap[K, S](64, deltaChainThreshold)
+ new OpenHashMapBasedStateMap[K, S](deltaChainThreshold)
}
}
@@ -79,7 +79,7 @@ private[streaming] class EmptyStateMap[K: ClassTag, S: ClassTag] extends StateMa
/** Implementation of StateMap based on Spark's [[org.apache.spark.util.collection.OpenHashMap]] */
private[streaming] class OpenHashMapBasedStateMap[K: ClassTag, S: ClassTag](
@transient @volatile var parentStateMap: StateMap[K, S],
- initialCapacity: Int = 64,
+ initialCapacity: Int = DEFAULT_INITIAL_CAPACITY,
deltaChainThreshold: Int = DELTA_CHAIN_LENGTH_THRESHOLD
) extends StateMap[K, S] { self =>
@@ -89,12 +89,14 @@ private[streaming] class OpenHashMapBasedStateMap[K: ClassTag, S: ClassTag](
deltaChainThreshold = deltaChainThreshold)
def this(deltaChainThreshold: Int) = this(
- initialCapacity = 64, deltaChainThreshold = deltaChainThreshold)
+ initialCapacity = DEFAULT_INITIAL_CAPACITY, deltaChainThreshold = deltaChainThreshold)
def this() = this(DELTA_CHAIN_LENGTH_THRESHOLD)
- @transient @volatile private var deltaMap =
- new OpenHashMap[K, StateInfo[S]](initialCapacity)
+ require(initialCapacity >= 1, "Invalid initial capacity")
+ require(deltaChainThreshold >= 1, "Invalid delta chain threshold")
+
+ @transient @volatile private var deltaMap = new OpenHashMap[K, StateInfo[S]](initialCapacity)
/** Get the session data if it exists */
override def get(key: K): Option[S] = {
@@ -284,9 +286,10 @@ private[streaming] class OpenHashMapBasedStateMap[K: ClassTag, S: ClassTag](
// Read the data of the parent map. Keep reading records, until the limiter is reached
// First read the approximate number of records to expect and allocate properly size
// OpenHashMap
- val parentSessionStoreSizeHint = inputStream.readInt()
+ val parentStateMapSizeHint = inputStream.readInt()
+ val newStateMapInitialCapacity = math.max(parentStateMapSizeHint, DEFAULT_INITIAL_CAPACITY)
val newParentSessionStore = new OpenHashMapBasedStateMap[K, S](
- initialCapacity = parentSessionStoreSizeHint, deltaChainThreshold)
+ initialCapacity = newStateMapInitialCapacity, deltaChainThreshold)
// Read the records until the limit marking object has been reached
var parentSessionLoopDone = false
@@ -338,4 +341,6 @@ private[streaming] object OpenHashMapBasedStateMap {
class LimitMarker(val num: Int) extends Serializable
val DELTA_CHAIN_LENGTH_THRESHOLD = 20
+
+ val DEFAULT_INITIAL_CAPACITY = 64
}