aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala159
-rw-r--r--core/src/test/scala/org/apache/spark/CheckpointSuite.scala37
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/CompactBufferSuite.scala105
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala15
9 files changed, 334 insertions, 43 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index aca235a62a..7d96089e52 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -25,7 +25,7 @@ import scala.language.existentials
import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
+import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer}
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleHandle
@@ -66,14 +66,14 @@ private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]
*/
@DeveloperApi
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
- extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
+ extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {
// For example, `(k, a) cogroup (k, b)` produces k -> Seq(ArrayBuffer as, ArrayBuffer bs).
// Each ArrayBuffer is represented as a CoGroup, and the resulting Seq as a CoGroupCombiner.
// CoGroupValue is the intermediate state of each value before being merged in compute.
- private type CoGroup = ArrayBuffer[Any]
+ private type CoGroup = CompactBuffer[Any]
private type CoGroupValue = (Any, Int) // Int is dependency number
- private type CoGroupCombiner = Seq[CoGroup]
+ private type CoGroupCombiner = Array[CoGroup]
private var serializer: Option[Serializer] = None
@@ -114,7 +114,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
override val partitioner: Some[Partitioner] = Some(part)
- override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
+ override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
val sparkConf = SparkEnv.get.conf
val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
val split = s.asInstanceOf[CoGroupPartition]
@@ -150,7 +150,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
getCombiner(kv._1)(depNum) += kv._2
}
}
- new InterruptibleIterator(context, map.iterator)
+ new InterruptibleIterator(context,
+ map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
} else {
val map = createExternalMap(numRdds)
rddIterators.foreach { case (it, depNum) =>
@@ -161,7 +162,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
}
context.taskMetrics.memoryBytesSpilled = map.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled = map.diskBytesSpilled
- new InterruptibleIterator(context, map.iterator)
+ new InterruptibleIterator(context,
+ map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index a6b9204672..c04d162a39 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -46,6 +46,7 @@ import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
+import org.apache.spark.util.collection.CompactBuffer
/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -361,12 +362,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
- val createCombiner = (v: V) => ArrayBuffer(v)
- val mergeValue = (buf: ArrayBuffer[V], v: V) => buf += v
- val mergeCombiners = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => c1 ++ c2
- val bufs = combineByKey[ArrayBuffer[V]](
+ val createCombiner = (v: V) => CompactBuffer(v)
+ val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
+ val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
+ val bufs = combineByKey[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)
- bufs.mapValues(_.toIterable)
+ bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
/**
@@ -571,11 +572,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
- cg.mapValues { case Seq(vs, w1s, w2s, w3s) =>
- (vs.asInstanceOf[Seq[V]],
- w1s.asInstanceOf[Seq[W1]],
- w2s.asInstanceOf[Seq[W2]],
- w3s.asInstanceOf[Seq[W3]])
+ cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
+ (vs.asInstanceOf[Iterable[V]],
+ w1s.asInstanceOf[Iterable[W1]],
+ w2s.asInstanceOf[Iterable[W2]],
+ w3s.asInstanceOf[Iterable[W3]])
}
}
@@ -589,8 +590,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
- cg.mapValues { case Seq(vs, w1s) =>
- (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W]])
+ cg.mapValues { case Array(vs, w1s) =>
+ (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
}
}
@@ -604,10 +605,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
- cg.mapValues { case Seq(vs, w1s, w2s) =>
- (vs.asInstanceOf[Seq[V]],
- w1s.asInstanceOf[Seq[W1]],
- w2s.asInstanceOf[Seq[W2]])
+ cg.mapValues { case Array(vs, w1s, w2s) =>
+ (vs.asInstanceOf[Iterable[V]],
+ w1s.asInstanceOf[Iterable[W1]],
+ w2s.asInstanceOf[Iterable[W2]])
}
}
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index c3a3e90a34..fa79b25759 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -31,6 +31,7 @@ import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage._
import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock}
import org.apache.spark.util.BoundedPriorityQueue
+import org.apache.spark.util.collection.CompactBuffer
import scala.reflect.ClassTag
@@ -185,6 +186,7 @@ private[serializer] object KryoSerializer {
classOf[GotBlock],
classOf[GetBlock],
classOf[MapStatus],
+ classOf[CompactBuffer[_]],
classOf[BlockManagerId],
classOf[Array[Byte]],
classOf[BoundedPriorityQueue[_]],
diff --git a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
new file mode 100644
index 0000000000..d44e15e3c9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+/**
+ * An append-only buffer similar to ArrayBuffer, but more memory-efficient for small buffers.
+ * ArrayBuffer always allocates an Object array to store the data, with 16 entries by default,
+ * so it has about 80-100 bytes of overhead. In contrast, CompactBuffer can keep up to two
+ * elements in fields of the main object, and only allocates an Array[AnyRef] if there are more
+ * entries than that. This makes it more efficient for operations like groupBy where we expect
+ * some keys to have very few elements.
+ */
+private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
+ // First two elements
+ private var element0: T = _
+ private var element1: T = _
+
+ // Number of elements, including our two in the main object
+ private var curSize = 0
+
+ // Array for extra elements
+ private var otherElements: Array[AnyRef] = null
+
+ def apply(position: Int): T = {
+ if (position < 0 || position >= curSize) {
+ throw new IndexOutOfBoundsException
+ }
+ if (position == 0) {
+ element0
+ } else if (position == 1) {
+ element1
+ } else {
+ otherElements(position - 2).asInstanceOf[T]
+ }
+ }
+
+ private def update(position: Int, value: T): Unit = {
+ if (position < 0 || position >= curSize) {
+ throw new IndexOutOfBoundsException
+ }
+ if (position == 0) {
+ element0 = value
+ } else if (position == 1) {
+ element1 = value
+ } else {
+ otherElements(position - 2) = value.asInstanceOf[AnyRef]
+ }
+ }
+
+ def += (value: T): CompactBuffer[T] = {
+ val newIndex = curSize
+ if (newIndex == 0) {
+ element0 = value
+ curSize = 1
+ } else if (newIndex == 1) {
+ element1 = value
+ curSize = 2
+ } else {
+ growToSize(curSize + 1)
+ otherElements(newIndex - 2) = value.asInstanceOf[AnyRef]
+ }
+ this
+ }
+
+ def ++= (values: TraversableOnce[T]): CompactBuffer[T] = {
+ values match {
+ // Optimize merging of CompactBuffers, used in cogroup and groupByKey
+ case compactBuf: CompactBuffer[T] =>
+ val oldSize = curSize
+ // Copy the other buffer's size and elements to local variables in case it is equal to us
+ val itsSize = compactBuf.curSize
+ val itsElements = compactBuf.otherElements
+ growToSize(curSize + itsSize)
+ if (itsSize == 1) {
+ this(oldSize) = compactBuf.element0
+ } else if (itsSize == 2) {
+ this(oldSize) = compactBuf.element0
+ this(oldSize + 1) = compactBuf.element1
+ } else if (itsSize > 2) {
+ this(oldSize) = compactBuf.element0
+ this(oldSize + 1) = compactBuf.element1
+ // At this point our size is also above 2, so just copy its array directly into ours.
+ // Note that since we added two elements above, the index in this.otherElements that we
+ // should copy to is oldSize.
+ System.arraycopy(itsElements, 0, otherElements, oldSize, itsSize - 2)
+ }
+
+ case _ =>
+ values.foreach(e => this += e)
+ }
+ this
+ }
+
+ override def length: Int = curSize
+
+ override def size: Int = curSize
+
+ override def iterator: Iterator[T] = new Iterator[T] {
+ private var pos = 0
+ override def hasNext: Boolean = pos < curSize
+ override def next(): T = {
+ if (!hasNext) {
+ throw new NoSuchElementException
+ }
+ pos += 1
+ apply(pos - 1)
+ }
+ }
+
+ /** Increase our size to newSize and grow the backing array if needed. */
+ private def growToSize(newSize: Int): Unit = {
+ if (newSize < 0) {
+ throw new UnsupportedOperationException("Can't grow buffer past Int.MaxValue elements")
+ }
+ val capacity = if (otherElements != null) otherElements.length + 2 else 2
+ if (newSize > capacity) {
+ var newArrayLen = 8
+ while (newSize - 2 > newArrayLen) {
+ newArrayLen *= 2
+ if (newArrayLen == Int.MinValue) {
+ // Prevent overflow if we double from 2^30 to 2^31, which will become Int.MinValue.
+ // Note that we set the new array length to Int.MaxValue - 2 so that our capacity
+ // calculation above still gives a positive integer.
+ newArrayLen = Int.MaxValue - 2
+ }
+ }
+ val newArray = new Array[AnyRef](newArrayLen)
+ if (otherElements != null) {
+ System.arraycopy(otherElements, 0, newArray, 0, otherElements.length)
+ }
+ otherElements = newArray
+ }
+ curSize = newSize
+ }
+}
+
+private[spark] object CompactBuffer {
+ def apply[T](): CompactBuffer[T] = new CompactBuffer[T]
+
+ def apply[T](value: T): CompactBuffer[T] = {
+ val buf = new CompactBuffer[T]
+ buf += value
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index fc00458083..d1cb2d9d3a 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -156,15 +156,20 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
test("CoGroupedRDD") {
val longLineageRDD1 = generateFatPairRDD()
+
+ // Collect the RDD as sequences instead of arrays to enable equality tests in testRDD
+ val seqCollectFunc = (rdd: RDD[(Int, Array[Iterable[Int]])]) =>
+ rdd.map{case (p, a) => (p, a.toSeq)}.collect(): Any
+
testRDD(rdd => {
CheckpointSuite.cogroup(longLineageRDD1, rdd.map(x => (x % 2, 1)), partitioner)
- })
+ }, seqCollectFunc)
val longLineageRDD2 = generateFatPairRDD()
testRDDPartitions(rdd => {
CheckpointSuite.cogroup(
longLineageRDD2, sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)), partitioner)
- })
+ }, seqCollectFunc)
}
test("ZippedPartitionsRDD") {
@@ -235,12 +240,19 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
assert(rdd.partitions.size === 0)
}
+ def defaultCollectFunc[T](rdd: RDD[T]): Any = rdd.collect()
+
/**
* Test checkpointing of the RDD generated by the given operation. It tests whether the
* serialized size of the RDD is reduce after checkpointing or not. This function should be called
* on all RDDs that have a parent RDD (i.e., do not call on ParallelCollection, BlockRDD, etc.).
+ *
+ * @param op an operation to run on the RDD
+ * @param collectFunc a function for collecting the values in the RDD, in case there are
+ * non-comparable types like arrays that we want to convert to something that supports ==
*/
- def testRDD[U: ClassTag](op: (RDD[Int]) => RDD[U]) {
+ def testRDD[U: ClassTag](op: (RDD[Int]) => RDD[U],
+ collectFunc: RDD[U] => Any = defaultCollectFunc[U] _) {
// Generate the final RDD using given RDD operation
val baseRDD = generateFatRDD()
val operatedRDD = op(baseRDD)
@@ -258,13 +270,13 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
operatedRDD.checkpoint()
- val result = operatedRDD.collect()
+ val result = collectFunc(operatedRDD)
operatedRDD.collect() // force re-initialization of post-checkpoint lazy variables
val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
// Test whether the checkpoint file has been created
- assert(sc.checkpointFile[U](operatedRDD.getCheckpointFile.get).collect() === result)
+ assert(collectFunc(sc.checkpointFile[U](operatedRDD.getCheckpointFile.get)) === result)
// Test whether dependencies have been changed from its earlier parent RDD
assert(operatedRDD.dependencies.head.rdd != parentRDD)
@@ -279,7 +291,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
assert(operatedRDD.partitions.length === numPartitions)
// Test whether the data in the checkpointed RDD is same as original
- assert(operatedRDD.collect() === result)
+ assert(collectFunc(operatedRDD) === result)
// Test whether serialized size of the RDD has reduced.
logInfo("Size of " + rddType +
@@ -289,7 +301,6 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
"Size of " + rddType + " did not reduce after checkpointing " +
" [" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
)
-
}
/**
@@ -300,8 +311,12 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
* This function should be called only those RDD whose partitions refer to parent RDD's
* partitions (i.e., do not call it on simple RDD like MappedRDD).
*
+ * @param op an operation to run on the RDD
+ * @param collectFunc a function for collecting the values in the RDD, in case there are
+ * non-comparable types like arrays that we want to convert to something that supports ==
*/
- def testRDDPartitions[U: ClassTag](op: (RDD[Int]) => RDD[U]) {
+ def testRDDPartitions[U: ClassTag](op: (RDD[Int]) => RDD[U],
+ collectFunc: RDD[U] => Any = defaultCollectFunc[U] _) {
// Generate the final RDD using given RDD operation
val baseRDD = generateFatRDD()
val operatedRDD = op(baseRDD)
@@ -316,13 +331,13 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
parentRDDs.foreach(_.checkpoint()) // checkpoint the parent RDD, not the generated one
- val result = operatedRDD.collect() // force checkpointing
+ val result = collectFunc(operatedRDD) // force checkpointing
operatedRDD.collect() // force re-initialization of post-checkpoint lazy variables
val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
// Test whether the data in the checkpointed RDD is same as original
- assert(operatedRDD.collect() === result)
+ assert(collectFunc(operatedRDD) === result)
// Test whether serialized size of the partitions has reduced
logInfo("Size of partitions of " + rddType +
@@ -436,7 +451,7 @@ object CheckpointSuite {
new CoGroupedRDD[K](
Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
part
- ).asInstanceOf[RDD[(K, Seq[Seq[V]])]]
+ ).asInstanceOf[RDD[(K, Array[Iterable[V]])]]
}
}
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 237e644b48..eae67c7747 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -176,7 +176,9 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"), p(3, "3"))
val pairs1: RDD[MutablePair[Int, Int]] = sc.parallelize(data1, 2)
val pairs2: RDD[MutablePair[Int, String]] = sc.parallelize(data2, 2)
- val results = new CoGroupedRDD[Int](Seq(pairs1, pairs2), new HashPartitioner(2)).collectAsMap()
+ val results = new CoGroupedRDD[Int](Seq(pairs1, pairs2), new HashPartitioner(2))
+ .map(p => (p._1, p._2.map(_.toArray)))
+ .collectAsMap()
assert(results(1)(0).length === 3)
assert(results(1)(0).contains(1))
diff --git a/core/src/test/scala/org/apache/spark/util/collection/CompactBufferSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/CompactBufferSuite.scala
new file mode 100644
index 0000000000..6c956d93dc
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/CompactBufferSuite.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import org.scalatest.FunSuite
+
+class CompactBufferSuite extends FunSuite {
+ test("empty buffer") {
+ val b = new CompactBuffer[Int]
+ assert(b.size === 0)
+ assert(b.iterator.toList === Nil)
+ assert(b.size === 0)
+ assert(b.iterator.toList === Nil)
+ intercept[IndexOutOfBoundsException] { b(0) }
+ intercept[IndexOutOfBoundsException] { b(1) }
+ intercept[IndexOutOfBoundsException] { b(2) }
+ intercept[IndexOutOfBoundsException] { b(-1) }
+ }
+
+ test("basic inserts") {
+ val b = new CompactBuffer[Int]
+ assert(b.size === 0)
+ assert(b.iterator.toList === Nil)
+ for (i <- 0 until 1000) {
+ b += i
+ assert(b.size === i + 1)
+ assert(b(i) === i)
+ }
+ assert(b.iterator.toList === (0 until 1000).toList)
+ assert(b.iterator.toList === (0 until 1000).toList)
+ assert(b.size === 1000)
+ }
+
+ test("adding sequences") {
+ val b = new CompactBuffer[Int]
+ assert(b.size === 0)
+ assert(b.iterator.toList === Nil)
+
+ // Add some simple lists and iterators
+ b ++= List(0)
+ assert(b.size === 1)
+ assert(b.iterator.toList === List(0))
+ b ++= Iterator(1)
+ assert(b.size === 2)
+ assert(b.iterator.toList === List(0, 1))
+ b ++= List(2)
+ assert(b.size === 3)
+ assert(b.iterator.toList === List(0, 1, 2))
+ b ++= Iterator(3, 4, 5, 6, 7, 8, 9)
+ assert(b.size === 10)
+ assert(b.iterator.toList === (0 until 10).toList)
+
+ // Add CompactBuffers
+ val b2 = new CompactBuffer[Int]
+ b2 ++= 0 until 10
+ b ++= b2
+ assert(b.iterator.toList === (1 to 2).flatMap(i => 0 until 10).toList)
+ b ++= b2
+ assert(b.iterator.toList === (1 to 3).flatMap(i => 0 until 10).toList)
+ b ++= b2
+ assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList)
+
+ // Add some small CompactBuffers as well
+ val b3 = new CompactBuffer[Int]
+ b ++= b3
+ assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList)
+ b3 += 0
+ b ++= b3
+ assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList ++ List(0))
+ b3 += 1
+ b ++= b3
+ assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList ++ List(0, 0, 1))
+ b3 += 2
+ b ++= b3
+ assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList ++ List(0, 0, 1, 0, 1, 2))
+ }
+
+ test("adding the same buffer to itself") {
+ val b = new CompactBuffer[Int]
+ assert(b.size === 0)
+ assert(b.iterator.toList === Nil)
+ b += 1
+ assert(b.toList === List(1))
+ for (j <- 1 until 8) {
+ b ++= b
+ assert(b.size === (1 << j))
+ assert(b.iterator.toList === (1 to (1 << j)).map(i => 1).toList)
+ }
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
index 88de2c8247..1f7de630e7 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
@@ -122,6 +122,10 @@ private[stat] object SpearmanCorrelation extends Correlation with Logging {
private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]], input: RDD[Vector]): RDD[Vector] = {
val partitioner = new HashPartitioner(input.partitions.size)
val cogrouped = new CoGroupedRDD[Long](ranks, partitioner)
- cogrouped.map { case (_, values: Seq[Seq[Double]]) => new DenseVector(values.flatten.toArray) }
+ cogrouped.map {
+ case (_, values: Array[Iterable[_]]) =>
+ val doubles = values.asInstanceOf[Array[Iterable[Double]]]
+ new DenseVector(doubles.flatten.toArray)
+ }
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index 40da313189..1a47089e51 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -133,17 +133,17 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
val numOldValues = oldRDDs.size
val numNewValues = newRDDs.size
- val mergeValues = (seqOfValues: Seq[Seq[V]]) => {
- if (seqOfValues.size != 1 + numOldValues + numNewValues) {
+ val mergeValues = (arrayOfValues: Array[Iterable[V]]) => {
+ if (arrayOfValues.size != 1 + numOldValues + numNewValues) {
throw new Exception("Unexpected number of sequences of reduced values")
}
// Getting reduced values "old time steps" that will be removed from current window
- val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head)
+ val oldValues = (1 to numOldValues).map(i => arrayOfValues(i)).filter(!_.isEmpty).map(_.head)
// Getting reduced values "new time steps"
val newValues =
- (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
+ (1 to numNewValues).map(i => arrayOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
- if (seqOfValues(0).isEmpty) {
+ if (arrayOfValues(0).isEmpty) {
// If previous window's reduce value does not exist, then at least new values should exist
if (newValues.isEmpty) {
throw new Exception("Neither previous window has value for key, nor new values found. " +
@@ -153,7 +153,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
newValues.reduce(reduceF) // return
} else {
// Get the previous window's reduced value
- var tempValue = seqOfValues(0).head
+ var tempValue = arrayOfValues(0).head
// If old values exists, then inverse reduce then from previous value
if (!oldValues.isEmpty) {
tempValue = invReduceF(tempValue, oldValues.reduce(reduceF))
@@ -166,7 +166,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
}
}
- val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues)
+ val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K, Array[Iterable[V]])]]
+ .mapValues(mergeValues)
if (filterFunc.isDefined) {
Some(mergedValuesRDD.filter(filterFunc.get))