aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-08-14 11:45:21 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-10-08 23:16:16 -0700
commit0e40cfabf867469f988979decd9981adc03c90b3 (patch)
treec411ed73cba1a49558adf4724cbf16bc53416471 /core
parentb535db7d89abd59713ce83ae937d06193a04441e (diff)
downloadspark-0e40cfabf867469f988979decd9981adc03c90b3.tar.gz
spark-0e40cfabf867469f988979decd9981adc03c90b3.tar.bz2
spark-0e40cfabf867469f988979decd9981adc03c90b3.zip
Fix some review comments
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala2
-rw-r--r--core/src/main/scala/spark/util/AppendOnlyMap.scala27
-rw-r--r--core/src/test/scala/spark/util/AppendOnlyMapSuite.scala23
4 files changed, 33 insertions, 38 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index fa1419df18..84e15fc0c8 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -17,18 +17,15 @@
package org.apache.spark
-import java.util.{HashMap => JHashMap}
+import org.apache.spark.util.AppendOnlyMap
-import scala.collection.JavaConversions._
-
-import spark.util.AppendOnlyMap
-
-/** A set of functions used to aggregate data.
- *
- * @param createCombiner function to create the initial value of the aggregation.
- * @param mergeValue function to merge a new value into the aggregation result.
- * @param mergeCombiners function to merge outputs from multiple mergeValue function.
- */
+/**
+ * A set of functions used to aggregate data.
+ *
+ * @param createCombiner function to create the initial value of the aggregation.
+ * @param mergeValue function to merge a new value into the aggregation result.
+ * @param mergeCombiners function to merge outputs from multiple mergeValue function.
+ */
case class Aggregator[K, V, C] (
createCombiner: V => C,
mergeValue: (C, V) => C,
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index f6dd8a65cb..f41a023bc1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -18,9 +18,7 @@
package org.apache.spark.rdd
import java.io.{ObjectOutputStream, IOException}
-import java.util.{HashMap => JHashMap}
-import scala.collection.JavaConversions
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{Partition, Partitioner, SparkEnv, TaskContext}
diff --git a/core/src/main/scala/spark/util/AppendOnlyMap.scala b/core/src/main/scala/spark/util/AppendOnlyMap.scala
index 416b93ea41..a7a8625c92 100644
--- a/core/src/main/scala/spark/util/AppendOnlyMap.scala
+++ b/core/src/main/scala/spark/util/AppendOnlyMap.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark.util
+package org.apache.spark.util
/**
* A simple open hash table optimized for the append-only use case, where keys
@@ -29,14 +29,10 @@ package spark.util
*/
private[spark]
class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] with Serializable {
- if (!isPowerOf2(initialCapacity)) {
- throw new IllegalArgumentException("Initial capacity must be power of 2")
- }
- if (initialCapacity >= (1 << 30)) {
- throw new IllegalArgumentException("Can't make capacity bigger than 2^29 elements")
- }
+ require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
+ require(initialCapacity >= 1, "Invalid initial capacity")
- private var capacity = initialCapacity
+ private var capacity = nextPowerOf2(initialCapacity)
private var curSize = 0
// Holds keys and values in the same array for memory locality; specifically, the order of
@@ -225,17 +221,8 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
capacity = newCapacity
}
- private def isPowerOf2(num: Int): Boolean = {
- var n = num
- while (n > 0) {
- if (n == 1) {
- return true
- } else if (n % 2 == 1) {
- return false
- } else {
- n /= 2
- }
- }
- return false
+ private def nextPowerOf2(n: Int): Int = {
+ val highBit = Integer.highestOneBit(n)
+ if (highBit == n) n else highBit << 1
}
}
diff --git a/core/src/test/scala/spark/util/AppendOnlyMapSuite.scala b/core/src/test/scala/spark/util/AppendOnlyMapSuite.scala
index d1e36781ef..7177919a58 100644
--- a/core/src/test/scala/spark/util/AppendOnlyMapSuite.scala
+++ b/core/src/test/scala/spark/util/AppendOnlyMapSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark.util
+package org.apache.spark.util
import scala.collection.mutable.HashSet
@@ -25,16 +25,18 @@ class AppendOnlyMapSuite extends FunSuite {
test("initialization") {
val goodMap1 = new AppendOnlyMap[Int, Int](1)
assert(goodMap1.size === 0)
- val goodMap2 = new AppendOnlyMap[Int, Int](256)
+ val goodMap2 = new AppendOnlyMap[Int, Int](255)
assert(goodMap2.size === 0)
+ val goodMap3 = new AppendOnlyMap[Int, Int](256)
+ assert(goodMap3.size === 0)
intercept[IllegalArgumentException] {
- new AppendOnlyMap[Int, Int](255) // Invalid map size: not power of 2
+ new AppendOnlyMap[Int, Int](1 << 30) // Invalid map size: bigger than 2^29
}
intercept[IllegalArgumentException] {
- new AppendOnlyMap[Int, Int](1 << 30) // Invalid map size: bigger than 2^29
+ new AppendOnlyMap[Int, Int](-1)
}
intercept[IllegalArgumentException] {
- new AppendOnlyMap[Int, Int](-1) // Invalid map size: not power of 2
+ new AppendOnlyMap[Int, Int](0)
}
}
@@ -138,4 +140,15 @@ class AppendOnlyMapSuite extends FunSuite {
})
assert(map.size === 401)
}
+
+ test("inserting in capacity-1 map") {
+ val map = new AppendOnlyMap[String, String](1)
+ for (i <- 1 to 100) {
+ map("" + i) = "" + i
+ }
+ assert(map.size === 100)
+ for (i <- 1 to 100) {
+ assert(map("" + i) === "" + i)
+ }
+ }
}