aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala10
1 files changed, 7 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala
index 8119d808ff..58b7031d5e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala
@@ -84,15 +84,19 @@ private[streaming] object MapWithStateRDDRecord {
* RDD, and a partitioned keyed-data RDD
*/
private[streaming] class MapWithStateRDDPartition(
- idx: Int,
+ override val index: Int,
@transient private var prevStateRDD: RDD[_],
@transient private var partitionedDataRDD: RDD[_]) extends Partition {
private[rdd] var previousSessionRDDPartition: Partition = null
private[rdd] var partitionedDataRDDPartition: Partition = null
- override def index: Int = idx
- override def hashCode(): Int = idx
+ override def hashCode(): Int = index
+
+ override def equals(other: Any): Boolean = other match {
+ case that: MapWithStateRDDPartition => index == that.index
+ case _ => false
+ }
@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {