aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-01-04 01:20:09 -0800
committerAndrew Or <andrewor14@gmail.com>2014-01-04 01:20:09 -0800
commit2db7884f6f1939d2a62fb71279a3ad80706308e1 (patch)
treed191db889b202cee34c65f3896ba8bbbe4bed5db /core
parent4296d96c82881cde5832bd8f8a3b48eb9817a218 (diff)
downloadspark-2db7884f6f1939d2a62fb71279a3ad80706308e1.tar.gz
spark-2db7884f6f1939d2a62fb71279a3ad80706308e1.tar.bz2
spark-2db7884f6f1939d2a62fb71279a3ad80706308e1.zip
Address Mark's comments
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala8
3 files changed, 13 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index bb488f4ad8..292e32e7c8 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -50,8 +50,8 @@ case class Aggregator[K, V, C] (
val combiners =
new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
while (iter.hasNext) {
- val kv = iter.next()
- combiners.insert(kv._1, kv._2)
+ val (k, v) = iter.next()
+ combiners.insert(k, v)
}
combiners.iterator
}
@@ -72,8 +72,8 @@ case class Aggregator[K, V, C] (
} else {
val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
while (iter.hasNext) {
- val kc = iter.next()
- combiners.insert(kc._1, kc._2)
+ val (k, c) = iter.next()
+ combiners.insert(k, c)
}
combiners.iterator
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
index d8fa7ed9af..6faaa3197f 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
@@ -49,12 +49,13 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
// Triggered by destructiveSortedIterator; the underlying data array may no longer be used
private var destroyed = false
+ private val destructionMessage = "Map state is invalid from destructive sorting!"
private val LOAD_FACTOR = 0.7
/** Get the value for a given key */
def apply(key: K): V = {
- checkValidityOrThrowException()
+ assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
return nullValue
@@ -78,7 +79,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
/** Set the value for a key */
def update(key: K, value: V): Unit = {
- checkValidityOrThrowException()
+ assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
if (!haveNullValue) {
@@ -113,7 +114,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
* for key, if any, or null otherwise. Returns the newly updated value.
*/
def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
- checkValidityOrThrowException()
+ assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
if (!haveNullValue) {
@@ -148,7 +149,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
/** Iterator method from Iterable */
override def iterator: Iterator[(K, V)] = {
- checkValidityOrThrowException()
+ assert(!destroyed, destructionMessage)
new Iterator[(K, V)] {
var pos = -1
@@ -287,10 +288,4 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
}
}
}
-
- private def checkValidityOrThrowException(): Unit = {
- if (destroyed) {
- throw new IllegalStateException("Map state is invalid from destructive sorting!")
- }
- }
}
diff --git a/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
index 71b936b0df..f44442f1a5 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
@@ -190,9 +190,9 @@ class AppendOnlyMapSuite extends FunSuite {
}
// All subsequent calls to apply, update, changeValue and iterator should throw exception
- intercept[IllegalStateException] { map.apply("1") }
- intercept[IllegalStateException] { map.update("1", "2013") }
- intercept[IllegalStateException] { map.changeValue("1", (hadValue, oldValue) => "2014") }
- intercept[IllegalStateException] { map.iterator }
+ intercept[AssertionError] { map.apply("1") }
+ intercept[AssertionError] { map.update("1", "2013") }
+ intercept[AssertionError] { map.changeValue("1", (hadValue, oldValue) => "2014") }
+ intercept[AssertionError] { map.iterator }
}
}