aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2014-07-25 00:32:32 -0700
committerMatei Zaharia <matei@databricks.com>2014-07-25 00:32:32 -0700
commit8529ced35c6b77a384d10a26b654a8073d57e03d (patch)
treea657574e398bf1c0b83cde30cc72b8bbdf6b3faf /core
parent2f75a4a30e1a3fdf384475b9660c6c43f093f68c (diff)
downloadspark-8529ced35c6b77a384d10a26b654a8073d57e03d.tar.gz
spark-8529ced35c6b77a384d10a26b654a8073d57e03d.tar.bz2
spark-8529ced35c6b77a384d10a26b654a8073d57e03d.zip
SPARK-2657 Use more compact data structures than ArrayBuffer in groupBy & cogroup
JIRA: https://issues.apache.org/jira/browse/SPARK-2657 Our current code uses ArrayBuffers for each group of values in groupBy, as well as for the key's elements in CoGroupedRDD. ArrayBuffers have a lot of overhead if there are few values in them, which is likely to happen in cases such as join. In particular, they have a pointer to an Object[] of size 16 by default, which is 24 bytes for the array header + 128 for the pointers in there, plus at least 32 for the ArrayBuffer data structure. This patch replaces the per-group buffers with a CompactBuffer class that can store up to 2 elements more efficiently (in fields of itself) and acts like an ArrayBuffer beyond that. For a key's elements in CoGroupedRDD, we use an Array of CompactBuffers instead of an ArrayBuffer of ArrayBuffers. There are some changes throughout the code to deal with CoGroupedRDD returning Array instead. We can also decide not to do that but CoGroupedRDD is a `DeveloperAPI` so I think it's okay to change it here. Author: Matei Zaharia <matei@databricks.com> Closes #1555 from mateiz/compact-groupby and squashes the following commits: 845a356 [Matei Zaharia] Lower initial size of CompactBuffer's vector to 8 07621a7 [Matei Zaharia] Review comments 0c1cd12 [Matei Zaharia] Don't use varargs in CompactBuffer.apply bdc8a39 [Matei Zaharia] Small tweak to +=, and typos f61f040 [Matei Zaharia] Fix line lengths 59da88b0 [Matei Zaharia] Fix line lengths 197cde8 [Matei Zaharia] Make CompactBuffer extend Seq to make its toSeq more efficient 775110f [Matei Zaharia] Change CoGroupedRDD to give (K, Array[Iterable[_]]) to avoid wrappers 9b4c6e8 [Matei Zaharia] Use CompactBuffer in CoGroupedRDD ed577ab [Matei Zaharia] Use CompactBuffer in groupByKey 10f0de1 [Matei Zaharia] A CompactBuffer that's more memory-efficient than ArrayBuffer for small buffers
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala159
-rw-r--r--core/src/test/scala/org/apache/spark/CheckpointSuite.scala37
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/CompactBufferSuite.scala105
7 files changed, 321 insertions, 35 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index aca235a62a..7d96089e52 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -25,7 +25,7 @@ import scala.language.existentials
import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
+import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer}
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleHandle
@@ -66,14 +66,14 @@ private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]
*/
@DeveloperApi
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
- extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
+ extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {
// For example, `(k, a) cogroup (k, b)` produces k -> Seq(ArrayBuffer as, ArrayBuffer bs).
// Each ArrayBuffer is represented as a CoGroup, and the resulting Seq as a CoGroupCombiner.
// CoGroupValue is the intermediate state of each value before being merged in compute.
- private type CoGroup = ArrayBuffer[Any]
+ private type CoGroup = CompactBuffer[Any]
private type CoGroupValue = (Any, Int) // Int is dependency number
- private type CoGroupCombiner = Seq[CoGroup]
+ private type CoGroupCombiner = Array[CoGroup]
private var serializer: Option[Serializer] = None
@@ -114,7 +114,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
override val partitioner: Some[Partitioner] = Some(part)
- override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
+ override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
val sparkConf = SparkEnv.get.conf
val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
val split = s.asInstanceOf[CoGroupPartition]
@@ -150,7 +150,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
getCombiner(kv._1)(depNum) += kv._2
}
}
- new InterruptibleIterator(context, map.iterator)
+ new InterruptibleIterator(context,
+ map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
} else {
val map = createExternalMap(numRdds)
rddIterators.foreach { case (it, depNum) =>
@@ -161,7 +162,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
}
context.taskMetrics.memoryBytesSpilled = map.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled = map.diskBytesSpilled
- new InterruptibleIterator(context, map.iterator)
+ new InterruptibleIterator(context,
+ map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index a6b9204672..c04d162a39 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -46,6 +46,7 @@ import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
+import org.apache.spark.util.collection.CompactBuffer
/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -361,12 +362,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
- val createCombiner = (v: V) => ArrayBuffer(v)
- val mergeValue = (buf: ArrayBuffer[V], v: V) => buf += v
- val mergeCombiners = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => c1 ++ c2
- val bufs = combineByKey[ArrayBuffer[V]](
+ val createCombiner = (v: V) => CompactBuffer(v)
+ val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
+ val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
+ val bufs = combineByKey[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)
- bufs.mapValues(_.toIterable)
+ bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
/**
@@ -571,11 +572,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
- cg.mapValues { case Seq(vs, w1s, w2s, w3s) =>
- (vs.asInstanceOf[Seq[V]],
- w1s.asInstanceOf[Seq[W1]],
- w2s.asInstanceOf[Seq[W2]],
- w3s.asInstanceOf[Seq[W3]])
+ cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
+ (vs.asInstanceOf[Iterable[V]],
+ w1s.asInstanceOf[Iterable[W1]],
+ w2s.asInstanceOf[Iterable[W2]],
+ w3s.asInstanceOf[Iterable[W3]])
}
}
@@ -589,8 +590,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
- cg.mapValues { case Seq(vs, w1s) =>
- (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W]])
+ cg.mapValues { case Array(vs, w1s) =>
+ (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
}
}
@@ -604,10 +605,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
- cg.mapValues { case Seq(vs, w1s, w2s) =>
- (vs.asInstanceOf[Seq[V]],
- w1s.asInstanceOf[Seq[W1]],
- w2s.asInstanceOf[Seq[W2]])
+ cg.mapValues { case Array(vs, w1s, w2s) =>
+ (vs.asInstanceOf[Iterable[V]],
+ w1s.asInstanceOf[Iterable[W1]],
+ w2s.asInstanceOf[Iterable[W2]])
}
}
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index c3a3e90a34..fa79b25759 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -31,6 +31,7 @@ import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage._
import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock}
import org.apache.spark.util.BoundedPriorityQueue
+import org.apache.spark.util.collection.CompactBuffer
import scala.reflect.ClassTag
@@ -185,6 +186,7 @@ private[serializer] object KryoSerializer {
classOf[GotBlock],
classOf[GetBlock],
classOf[MapStatus],
+ classOf[CompactBuffer[_]],
classOf[BlockManagerId],
classOf[Array[Byte]],
classOf[BoundedPriorityQueue[_]],
diff --git a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
new file mode 100644
index 0000000000..d44e15e3c9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+/**
+ * An append-only buffer similar to ArrayBuffer, but more memory-efficient for small buffers.
+ * ArrayBuffer always allocates an Object array to store the data, with 16 entries by default,
+ * so it has about 80-100 bytes of overhead. In contrast, CompactBuffer can keep up to two
+ * elements in fields of the main object, and only allocates an Array[AnyRef] if there are more
+ * entries than that. This makes it more efficient for operations like groupBy where we expect
+ * some keys to have very few elements.
+ */
+private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
+ // First two elements
+ private var element0: T = _
+ private var element1: T = _
+
+ // Number of elements, including our two in the main object
+ private var curSize = 0
+
+ // Array for extra elements
+ private var otherElements: Array[AnyRef] = null
+
+ def apply(position: Int): T = {
+ if (position < 0 || position >= curSize) {
+ throw new IndexOutOfBoundsException
+ }
+ if (position == 0) {
+ element0
+ } else if (position == 1) {
+ element1
+ } else {
+ otherElements(position - 2).asInstanceOf[T]
+ }
+ }
+
+ private def update(position: Int, value: T): Unit = {
+ if (position < 0 || position >= curSize) {
+ throw new IndexOutOfBoundsException
+ }
+ if (position == 0) {
+ element0 = value
+ } else if (position == 1) {
+ element1 = value
+ } else {
+ otherElements(position - 2) = value.asInstanceOf[AnyRef]
+ }
+ }
+
+ def += (value: T): CompactBuffer[T] = {
+ val newIndex = curSize
+ if (newIndex == 0) {
+ element0 = value
+ curSize = 1
+ } else if (newIndex == 1) {
+ element1 = value
+ curSize = 2
+ } else {
+ growToSize(curSize + 1)
+ otherElements(newIndex - 2) = value.asInstanceOf[AnyRef]
+ }
+ this
+ }
+
+ def ++= (values: TraversableOnce[T]): CompactBuffer[T] = {
+ values match {
+ // Optimize merging of CompactBuffers, used in cogroup and groupByKey
+ case compactBuf: CompactBuffer[T] =>
+ val oldSize = curSize
+ // Copy the other buffer's size and elements to local variables in case it is equal to us
+ val itsSize = compactBuf.curSize
+ val itsElements = compactBuf.otherElements
+ growToSize(curSize + itsSize)
+ if (itsSize == 1) {
+ this(oldSize) = compactBuf.element0
+ } else if (itsSize == 2) {
+ this(oldSize) = compactBuf.element0
+ this(oldSize + 1) = compactBuf.element1
+ } else if (itsSize > 2) {
+ this(oldSize) = compactBuf.element0
+ this(oldSize + 1) = compactBuf.element1
+ // At this point our size is also above 2, so just copy its array directly into ours.
+ // Note that since we added two elements above, the index in this.otherElements that we
+ // should copy to is oldSize.
+ System.arraycopy(itsElements, 0, otherElements, oldSize, itsSize - 2)
+ }
+
+ case _ =>
+ values.foreach(e => this += e)
+ }
+ this
+ }
+
+ override def length: Int = curSize
+
+ override def size: Int = curSize
+
+ override def iterator: Iterator[T] = new Iterator[T] {
+ private var pos = 0
+ override def hasNext: Boolean = pos < curSize
+ override def next(): T = {
+ if (!hasNext) {
+ throw new NoSuchElementException
+ }
+ pos += 1
+ apply(pos - 1)
+ }
+ }
+
+ /** Increase our size to newSize and grow the backing array if needed. */
+ private def growToSize(newSize: Int): Unit = {
+ if (newSize < 0) {
+ throw new UnsupportedOperationException("Can't grow buffer past Int.MaxValue elements")
+ }
+ val capacity = if (otherElements != null) otherElements.length + 2 else 2
+ if (newSize > capacity) {
+ var newArrayLen = 8
+ while (newSize - 2 > newArrayLen) {
+ newArrayLen *= 2
+ if (newArrayLen == Int.MinValue) {
+ // Prevent overflow if we double from 2^30 to 2^31, which will become Int.MinValue.
+ // Note that we set the new array length to Int.MaxValue - 2 so that our capacity
+ // calculation above still gives a positive integer.
+ newArrayLen = Int.MaxValue - 2
+ }
+ }
+ val newArray = new Array[AnyRef](newArrayLen)
+ if (otherElements != null) {
+ System.arraycopy(otherElements, 0, newArray, 0, otherElements.length)
+ }
+ otherElements = newArray
+ }
+ curSize = newSize
+ }
+}
+
+private[spark] object CompactBuffer {
+ def apply[T](): CompactBuffer[T] = new CompactBuffer[T]
+
+ def apply[T](value: T): CompactBuffer[T] = {
+ val buf = new CompactBuffer[T]
+ buf += value
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index fc00458083..d1cb2d9d3a 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -156,15 +156,20 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
test("CoGroupedRDD") {
val longLineageRDD1 = generateFatPairRDD()
+
+ // Collect the RDD as sequences instead of arrays to enable equality tests in testRDD
+ val seqCollectFunc = (rdd: RDD[(Int, Array[Iterable[Int]])]) =>
+ rdd.map{case (p, a) => (p, a.toSeq)}.collect(): Any
+
testRDD(rdd => {
CheckpointSuite.cogroup(longLineageRDD1, rdd.map(x => (x % 2, 1)), partitioner)
- })
+ }, seqCollectFunc)
val longLineageRDD2 = generateFatPairRDD()
testRDDPartitions(rdd => {
CheckpointSuite.cogroup(
longLineageRDD2, sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)), partitioner)
- })
+ }, seqCollectFunc)
}
test("ZippedPartitionsRDD") {
@@ -235,12 +240,19 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
assert(rdd.partitions.size === 0)
}
+ def defaultCollectFunc[T](rdd: RDD[T]): Any = rdd.collect()
+
/**
* Test checkpointing of the RDD generated by the given operation. It tests whether the
* serialized size of the RDD is reduce after checkpointing or not. This function should be called
* on all RDDs that have a parent RDD (i.e., do not call on ParallelCollection, BlockRDD, etc.).
+ *
+ * @param op an operation to run on the RDD
+ * @param collectFunc a function for collecting the values in the RDD, in case there are
+ * non-comparable types like arrays that we want to convert to something that supports ==
*/
- def testRDD[U: ClassTag](op: (RDD[Int]) => RDD[U]) {
+ def testRDD[U: ClassTag](op: (RDD[Int]) => RDD[U],
+ collectFunc: RDD[U] => Any = defaultCollectFunc[U] _) {
// Generate the final RDD using given RDD operation
val baseRDD = generateFatRDD()
val operatedRDD = op(baseRDD)
@@ -258,13 +270,13 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
operatedRDD.checkpoint()
- val result = operatedRDD.collect()
+ val result = collectFunc(operatedRDD)
operatedRDD.collect() // force re-initialization of post-checkpoint lazy variables
val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
// Test whether the checkpoint file has been created
- assert(sc.checkpointFile[U](operatedRDD.getCheckpointFile.get).collect() === result)
+ assert(collectFunc(sc.checkpointFile[U](operatedRDD.getCheckpointFile.get)) === result)
// Test whether dependencies have been changed from its earlier parent RDD
assert(operatedRDD.dependencies.head.rdd != parentRDD)
@@ -279,7 +291,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
assert(operatedRDD.partitions.length === numPartitions)
// Test whether the data in the checkpointed RDD is same as original
- assert(operatedRDD.collect() === result)
+ assert(collectFunc(operatedRDD) === result)
// Test whether serialized size of the RDD has reduced.
logInfo("Size of " + rddType +
@@ -289,7 +301,6 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
"Size of " + rddType + " did not reduce after checkpointing " +
" [" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
)
-
}
/**
@@ -300,8 +311,12 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
* This function should be called only those RDD whose partitions refer to parent RDD's
* partitions (i.e., do not call it on simple RDD like MappedRDD).
*
+ * @param op an operation to run on the RDD
+ * @param collectFunc a function for collecting the values in the RDD, in case there are
+ * non-comparable types like arrays that we want to convert to something that supports ==
*/
- def testRDDPartitions[U: ClassTag](op: (RDD[Int]) => RDD[U]) {
+ def testRDDPartitions[U: ClassTag](op: (RDD[Int]) => RDD[U],
+ collectFunc: RDD[U] => Any = defaultCollectFunc[U] _) {
// Generate the final RDD using given RDD operation
val baseRDD = generateFatRDD()
val operatedRDD = op(baseRDD)
@@ -316,13 +331,13 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
parentRDDs.foreach(_.checkpoint()) // checkpoint the parent RDD, not the generated one
- val result = operatedRDD.collect() // force checkpointing
+ val result = collectFunc(operatedRDD) // force checkpointing
operatedRDD.collect() // force re-initialization of post-checkpoint lazy variables
val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
// Test whether the data in the checkpointed RDD is same as original
- assert(operatedRDD.collect() === result)
+ assert(collectFunc(operatedRDD) === result)
// Test whether serialized size of the partitions has reduced
logInfo("Size of partitions of " + rddType +
@@ -436,7 +451,7 @@ object CheckpointSuite {
new CoGroupedRDD[K](
Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
part
- ).asInstanceOf[RDD[(K, Seq[Seq[V]])]]
+ ).asInstanceOf[RDD[(K, Array[Iterable[V]])]]
}
}
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 237e644b48..eae67c7747 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -176,7 +176,9 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"), p(3, "3"))
val pairs1: RDD[MutablePair[Int, Int]] = sc.parallelize(data1, 2)
val pairs2: RDD[MutablePair[Int, String]] = sc.parallelize(data2, 2)
- val results = new CoGroupedRDD[Int](Seq(pairs1, pairs2), new HashPartitioner(2)).collectAsMap()
+ val results = new CoGroupedRDD[Int](Seq(pairs1, pairs2), new HashPartitioner(2))
+ .map(p => (p._1, p._2.map(_.toArray)))
+ .collectAsMap()
assert(results(1)(0).length === 3)
assert(results(1)(0).contains(1))
diff --git a/core/src/test/scala/org/apache/spark/util/collection/CompactBufferSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/CompactBufferSuite.scala
new file mode 100644
index 0000000000..6c956d93dc
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/CompactBufferSuite.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import org.scalatest.FunSuite
+
+class CompactBufferSuite extends FunSuite {
+ test("empty buffer") {
+ val b = new CompactBuffer[Int]
+ assert(b.size === 0)
+ assert(b.iterator.toList === Nil)
+ assert(b.size === 0)
+ assert(b.iterator.toList === Nil)
+ intercept[IndexOutOfBoundsException] { b(0) }
+ intercept[IndexOutOfBoundsException] { b(1) }
+ intercept[IndexOutOfBoundsException] { b(2) }
+ intercept[IndexOutOfBoundsException] { b(-1) }
+ }
+
+ test("basic inserts") {
+ val b = new CompactBuffer[Int]
+ assert(b.size === 0)
+ assert(b.iterator.toList === Nil)
+ for (i <- 0 until 1000) {
+ b += i
+ assert(b.size === i + 1)
+ assert(b(i) === i)
+ }
+ assert(b.iterator.toList === (0 until 1000).toList)
+ assert(b.iterator.toList === (0 until 1000).toList)
+ assert(b.size === 1000)
+ }
+
+ test("adding sequences") {
+ val b = new CompactBuffer[Int]
+ assert(b.size === 0)
+ assert(b.iterator.toList === Nil)
+
+ // Add some simple lists and iterators
+ b ++= List(0)
+ assert(b.size === 1)
+ assert(b.iterator.toList === List(0))
+ b ++= Iterator(1)
+ assert(b.size === 2)
+ assert(b.iterator.toList === List(0, 1))
+ b ++= List(2)
+ assert(b.size === 3)
+ assert(b.iterator.toList === List(0, 1, 2))
+ b ++= Iterator(3, 4, 5, 6, 7, 8, 9)
+ assert(b.size === 10)
+ assert(b.iterator.toList === (0 until 10).toList)
+
+ // Add CompactBuffers
+ val b2 = new CompactBuffer[Int]
+ b2 ++= 0 until 10
+ b ++= b2
+ assert(b.iterator.toList === (1 to 2).flatMap(i => 0 until 10).toList)
+ b ++= b2
+ assert(b.iterator.toList === (1 to 3).flatMap(i => 0 until 10).toList)
+ b ++= b2
+ assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList)
+
+ // Add some small CompactBuffers as well
+ val b3 = new CompactBuffer[Int]
+ b ++= b3
+ assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList)
+ b3 += 0
+ b ++= b3
+ assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList ++ List(0))
+ b3 += 1
+ b ++= b3
+ assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList ++ List(0, 0, 1))
+ b3 += 2
+ b ++= b3
+ assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList ++ List(0, 0, 1, 0, 1, 2))
+ }
+
+ test("adding the same buffer to itself") {
+ val b = new CompactBuffer[Int]
+ assert(b.size === 0)
+ assert(b.iterator.toList === Nil)
+ b += 1
+ assert(b.toList === List(1))
+ for (j <- 1 until 8) {
+ b ++= b
+ assert(b.size === (1 << j))
+ assert(b.iterator.toList === (1 to (1 << j)).map(i => 1).toList)
+ }
+ }
+}