diff options
author | zsxwing <zsxwing@gmail.com> | 2014-11-29 20:23:08 -0500 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-11-29 20:23:08 -0500 |
commit | c06222427f866fe216d819bbf4eba7b1c834835c (patch) | |
tree | 54b3475d50fa1a977558fd73eb48a32d1f4025be /core | |
parent | 938dc141ee4448c20441fa9dfa3a9897a11ed4b6 (diff) | |
download | spark-c06222427f866fe216d819bbf4eba7b1c834835c.tar.gz spark-c06222427f866fe216d819bbf4eba7b1c834835c.tar.bz2 spark-c06222427f866fe216d819bbf4eba7b1c834835c.zip |
[SPARK-4505][Core] Add a ClassTag parameter to CompactBuffer[T]
Added a ClassTag parameter to CompactBuffer. So CompactBuffer[T] can create primitive arrays for primitive types. It will reduce the memory usage for primitive types significantly and only pay minor performance lost.
Here is my test code:
```Scala
// Call org.apache.spark.util.SizeEstimator.estimate
def estimateSize(obj: AnyRef): Long = {
val c = Class.forName("org.apache.spark.util.SizeEstimator$")
val f = c.getField("MODULE$")
val o = f.get(c)
val m = c.getMethod("estimate", classOf[Object])
m.setAccessible(true)
m.invoke(o, obj).asInstanceOf[Long]
}
sc.parallelize(1 to 10000).groupBy(_ => 1).foreach {
case (k, v) =>
println(v.getClass() + " size: " + estimateSize(v))
}
```
Using the previous CompactBuffer outputed
```
class org.apache.spark.util.collection.CompactBuffer size: 313358
```
Using the new CompactBuffer outputed
```
class org.apache.spark.util.collection.CompactBuffer size: 65712
```
In this case, the new `CompactBuffer` only used 20% memory of the previous one. It's really helpful for `groupByKey` when using a primitive value.
Author: zsxwing <zsxwing@gmail.com>
Closes #3378 from zsxwing/SPARK-4505 and squashes the following commits:
4abdbba [zsxwing] Add a ClassTag parameter to reduce the memory usage of CompactBuffer[T] when T is a primitive type
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala | 18 |
1 files changed, 10 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala index d44e15e3c9..4d43d8d5cc 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala @@ -17,6 +17,8 @@ package org.apache.spark.util.collection +import scala.reflect.ClassTag + /** * An append-only buffer similar to ArrayBuffer, but more memory-efficient for small buffers. * ArrayBuffer always allocates an Object array to store the data, with 16 entries by default, @@ -25,7 +27,7 @@ package org.apache.spark.util.collection * entries than that. This makes it more efficient for operations like groupBy where we expect * some keys to have very few elements. */ -private[spark] class CompactBuffer[T] extends Seq[T] with Serializable { +private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable { // First two elements private var element0: T = _ private var element1: T = _ @@ -34,7 +36,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable { private var curSize = 0 // Array for extra elements - private var otherElements: Array[AnyRef] = null + private var otherElements: Array[T] = null def apply(position: Int): T = { if (position < 0 || position >= curSize) { @@ -45,7 +47,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable { } else if (position == 1) { element1 } else { - otherElements(position - 2).asInstanceOf[T] + otherElements(position - 2) } } @@ -58,7 +60,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable { } else if (position == 1) { element1 = value } else { - otherElements(position - 2) = value.asInstanceOf[AnyRef] + otherElements(position - 2) = value } } @@ -72,7 +74,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable { curSize = 2 } else { growToSize(curSize + 1) - otherElements(newIndex - 2) = value.asInstanceOf[AnyRef] + otherElements(newIndex - 2) = value } this } @@ -139,7 +141,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable { newArrayLen = Int.MaxValue - 2 } } - val newArray = new Array[AnyRef](newArrayLen) + val newArray = new Array[T](newArrayLen) if (otherElements != null) { System.arraycopy(otherElements, 0, newArray, 0, otherElements.length) } @@ -150,9 +152,9 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable { } private[spark] object CompactBuffer { - def apply[T](): CompactBuffer[T] = new CompactBuffer[T] + def apply[T: ClassTag](): CompactBuffer[T] = new CompactBuffer[T] - def apply[T](value: T): CompactBuffer[T] = { + def apply[T: ClassTag](value: T): CompactBuffer[T] = { val buf = new CompactBuffer[T] buf += value } |