From 0e40cfabf867469f988979decd9981adc03c90b3 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Wed, 14 Aug 2013 11:45:21 -0700 Subject: Fix some review comments --- .../main/scala/org/apache/spark/Aggregator.scala | 19 +++++++-------- .../scala/org/apache/spark/rdd/CoGroupedRDD.scala | 2 -- core/src/main/scala/spark/util/AppendOnlyMap.scala | 27 ++++++---------------- .../test/scala/spark/util/AppendOnlyMapSuite.scala | 23 ++++++++++++++---- 4 files changed, 33 insertions(+), 38 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index fa1419df18..84e15fc0c8 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -17,18 +17,15 @@ package org.apache.spark -import java.util.{HashMap => JHashMap} +import org.apache.spark.util.AppendOnlyMap -import scala.collection.JavaConversions._ - -import spark.util.AppendOnlyMap - -/** A set of functions used to aggregate data. - * - * @param createCombiner function to create the initial value of the aggregation. - * @param mergeValue function to merge a new value into the aggregation result. - * @param mergeCombiners function to merge outputs from multiple mergeValue function. - */ +/** + * A set of functions used to aggregate data. + * + * @param createCombiner function to create the initial value of the aggregation. + * @param mergeValue function to merge a new value into the aggregation result. + * @param mergeCombiners function to merge outputs from multiple mergeValue function. + */ case class Aggregator[K, V, C] ( createCombiner: V => C, mergeValue: (C, V) => C, diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index f6dd8a65cb..f41a023bc1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -18,9 +18,7 @@ package org.apache.spark.rdd import java.io.{ObjectOutputStream, IOException} -import java.util.{HashMap => JHashMap} -import scala.collection.JavaConversions import scala.collection.mutable.ArrayBuffer import org.apache.spark.{Partition, Partitioner, SparkEnv, TaskContext} diff --git a/core/src/main/scala/spark/util/AppendOnlyMap.scala b/core/src/main/scala/spark/util/AppendOnlyMap.scala index 416b93ea41..a7a8625c92 100644 --- a/core/src/main/scala/spark/util/AppendOnlyMap.scala +++ b/core/src/main/scala/spark/util/AppendOnlyMap.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package spark.util +package org.apache.spark.util /** * A simple open hash table optimized for the append-only use case, where keys @@ -29,14 +29,10 @@ package spark.util */ private[spark] class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] with Serializable { - if (!isPowerOf2(initialCapacity)) { - throw new IllegalArgumentException("Initial capacity must be power of 2") - } - if (initialCapacity >= (1 << 30)) { - throw new IllegalArgumentException("Can't make capacity bigger than 2^29 elements") - } + require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements") + require(initialCapacity >= 1, "Invalid initial capacity") - private var capacity = initialCapacity + private var capacity = nextPowerOf2(initialCapacity) private var curSize = 0 // Holds keys and values in the same array for memory locality; specifically, the order of @@ -225,17 +221,8 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi capacity = newCapacity } - private def isPowerOf2(num: Int): Boolean = { - var n = num - while (n > 0) { - if (n == 1) { - return true - } else if (n % 2 == 1) { - return false - } else { - n /= 2 - } - } - return false + private def nextPowerOf2(n: Int): Int = { + val highBit = Integer.highestOneBit(n) + if (highBit == n) n else highBit << 1 } } diff --git a/core/src/test/scala/spark/util/AppendOnlyMapSuite.scala b/core/src/test/scala/spark/util/AppendOnlyMapSuite.scala index d1e36781ef..7177919a58 100644 --- a/core/src/test/scala/spark/util/AppendOnlyMapSuite.scala +++ b/core/src/test/scala/spark/util/AppendOnlyMapSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package spark.util +package org.apache.spark.util import scala.collection.mutable.HashSet @@ -25,16 +25,18 @@ class AppendOnlyMapSuite extends FunSuite { test("initialization") { val goodMap1 = new AppendOnlyMap[Int, Int](1) assert(goodMap1.size === 0) - val goodMap2 = new AppendOnlyMap[Int, Int](256) + val goodMap2 = new AppendOnlyMap[Int, Int](255) assert(goodMap2.size === 0) + val goodMap3 = new AppendOnlyMap[Int, Int](256) + assert(goodMap3.size === 0) intercept[IllegalArgumentException] { - new AppendOnlyMap[Int, Int](255) // Invalid map size: not power of 2 + new AppendOnlyMap[Int, Int](1 << 30) // Invalid map size: bigger than 2^29 } intercept[IllegalArgumentException] { - new AppendOnlyMap[Int, Int](1 << 30) // Invalid map size: bigger than 2^29 + new AppendOnlyMap[Int, Int](-1) } intercept[IllegalArgumentException] { - new AppendOnlyMap[Int, Int](-1) // Invalid map size: not power of 2 + new AppendOnlyMap[Int, Int](0) } } @@ -138,4 +140,15 @@ class AppendOnlyMapSuite extends FunSuite { }) assert(map.size === 401) } + + test("inserting in capacity-1 map") { + val map = new AppendOnlyMap[String, String](1) + for (i <- 1 to 100) { + map("" + i) = "" + i + } + assert(map.size === 100) + for (i <- 1 to 100) { + assert(map("" + i) === "" + i) + } + } } -- cgit v1.2.3