aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorVinod K C <vinod.kc@huawei.com>2015-09-03 13:54:58 -0700
committerAndrew Or <andrew@databricks.com>2015-09-03 13:55:02 -0700
commit11ef32c5a1fad84574733ade1e9d50a94275842b (patch)
tree757ee53156ec0c77d96f3287e2d9a03cbb7d2c27 /core
parente62f4a46f4396ae1e064e3d2ebfa2434f549b090 (diff)
downloadspark-11ef32c5a1fad84574733ade1e9d50a94275842b.tar.gz
spark-11ef32c5a1fad84574733ade1e9d50a94275842b.tar.bz2
spark-11ef32c5a1fad84574733ade1e9d50a94275842b.zip
[SPARK-10430] [CORE] Added hashCode methods in AccumulableInfo and RDDOperationScope
Author: Vinod K C <vinod.kc@huawei.com> Closes #8581 from vinodkc/fix_RDDOperationScope_Hashcode.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala9
4 files changed, 26 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
index 44667281c1..540cbd688b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.{JsonIgnore, JsonInclude, JsonPropertyOr
import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.google.common.base.Objects
import org.apache.spark.{Logging, SparkContext}
@@ -67,6 +68,8 @@ private[spark] class RDDOperationScope(
}
}
+ override def hashCode(): Int = Objects.hashCode(id, name, parent)
+
override def toString: String = toJson
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
index 11d123eec4..b6bff64ee3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
@@ -34,9 +34,15 @@ class AccumulableInfo private[spark] (
override def equals(other: Any): Boolean = other match {
case acc: AccumulableInfo =>
this.id == acc.id && this.name == acc.name &&
- this.update == acc.update && this.value == acc.value
+ this.update == acc.update && this.value == acc.value &&
+ this.internal == acc.internal
case _ => false
}
+
+ override def hashCode(): Int = {
+ val state = Seq(id, name, update, value, internal)
+ state.map(_.hashCode).reduceLeft(31 * _ + _)
+ }
}
object AccumulableInfo {
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala
index f65349e3e3..16a92f54f9 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala
@@ -38,6 +38,13 @@ class RDDOperationScopeSuite extends SparkFunSuite with BeforeAndAfter {
sc.stop()
}
+ test("equals and hashCode") {
+ val opScope1 = new RDDOperationScope("scope1", id = "1")
+ val opScope2 = new RDDOperationScope("scope1", id = "1")
+ assert(opScope1 === opScope2)
+ assert(opScope1.hashCode() === opScope2.hashCode())
+ }
+
test("getAllScopes") {
assert(scope1.getAllScopes === Seq(scope1))
assert(scope2.getAllScopes === Seq(scope1, scope2))
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 80f64de443..2c3aa2cf90 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -285,6 +285,15 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}
+ test("equals and hashCode AccumulableInfo") {
+ val accInfo1 = new AccumulableInfo(1, " Accumulable " + 1, Some("delta" + 1), "val" + 1, true)
+ val accInfo2 = new AccumulableInfo(1, " Accumulable " + 1, Some("delta" + 1), "val" + 1, false)
+ val accInfo3 = new AccumulableInfo(1, " Accumulable " + 1, Some("delta" + 1), "val" + 1, false)
+ assert(accInfo1 !== accInfo2)
+ assert(accInfo2 === accInfo3)
+ assert(accInfo2.hashCode() === accInfo3.hashCode())
+ }
+
test("cache location preferences w/ dependency") {
val baseRdd = new MyRDD(sc, 1, Nil).cache()
val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd)))