aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2014-11-29 20:23:08 -0500
committerPatrick Wendell <pwendell@gmail.com>2014-11-29 20:23:08 -0500
commitc06222427f866fe216d819bbf4eba7b1c834835c (patch)
tree54b3475d50fa1a977558fd73eb48a32d1f4025be /core
parent938dc141ee4448c20441fa9dfa3a9897a11ed4b6 (diff)
downloadspark-c06222427f866fe216d819bbf4eba7b1c834835c.tar.gz
spark-c06222427f866fe216d819bbf4eba7b1c834835c.tar.bz2
spark-c06222427f866fe216d819bbf4eba7b1c834835c.zip
[SPARK-4505][Core] Add a ClassTag parameter to CompactBuffer[T]
Added a ClassTag parameter to CompactBuffer. So CompactBuffer[T] can create primitive arrays for primitive types. It will reduce the memory usage for primitive types significantly and only pay minor performance lost. Here is my test code: ```Scala // Call org.apache.spark.util.SizeEstimator.estimate def estimateSize(obj: AnyRef): Long = { val c = Class.forName("org.apache.spark.util.SizeEstimator$") val f = c.getField("MODULE$") val o = f.get(c) val m = c.getMethod("estimate", classOf[Object]) m.setAccessible(true) m.invoke(o, obj).asInstanceOf[Long] } sc.parallelize(1 to 10000).groupBy(_ => 1).foreach { case (k, v) => println(v.getClass() + " size: " + estimateSize(v)) } ``` Using the previous CompactBuffer outputed ``` class org.apache.spark.util.collection.CompactBuffer size: 313358 ``` Using the new CompactBuffer outputed ``` class org.apache.spark.util.collection.CompactBuffer size: 65712 ``` In this case, the new `CompactBuffer` only used 20% memory of the previous one. It's really helpful for `groupByKey` when using a primitive value. Author: zsxwing <zsxwing@gmail.com> Closes #3378 from zsxwing/SPARK-4505 and squashes the following commits: 4abdbba [zsxwing] Add a ClassTag parameter to reduce the memory usage of CompactBuffer[T] when T is a primitive type
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala18
1 files changed, 10 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
index d44e15e3c9..4d43d8d5cc 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
@@ -17,6 +17,8 @@
package org.apache.spark.util.collection
+import scala.reflect.ClassTag
+
/**
* An append-only buffer similar to ArrayBuffer, but more memory-efficient for small buffers.
* ArrayBuffer always allocates an Object array to store the data, with 16 entries by default,
@@ -25,7 +27,7 @@ package org.apache.spark.util.collection
* entries than that. This makes it more efficient for operations like groupBy where we expect
* some keys to have very few elements.
*/
-private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
+private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable {
// First two elements
private var element0: T = _
private var element1: T = _
@@ -34,7 +36,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
private var curSize = 0
// Array for extra elements
- private var otherElements: Array[AnyRef] = null
+ private var otherElements: Array[T] = null
def apply(position: Int): T = {
if (position < 0 || position >= curSize) {
@@ -45,7 +47,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
} else if (position == 1) {
element1
} else {
- otherElements(position - 2).asInstanceOf[T]
+ otherElements(position - 2)
}
}
@@ -58,7 +60,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
} else if (position == 1) {
element1 = value
} else {
- otherElements(position - 2) = value.asInstanceOf[AnyRef]
+ otherElements(position - 2) = value
}
}
@@ -72,7 +74,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
curSize = 2
} else {
growToSize(curSize + 1)
- otherElements(newIndex - 2) = value.asInstanceOf[AnyRef]
+ otherElements(newIndex - 2) = value
}
this
}
@@ -139,7 +141,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
newArrayLen = Int.MaxValue - 2
}
}
- val newArray = new Array[AnyRef](newArrayLen)
+ val newArray = new Array[T](newArrayLen)
if (otherElements != null) {
System.arraycopy(otherElements, 0, newArray, 0, otherElements.length)
}
@@ -150,9 +152,9 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
}
private[spark] object CompactBuffer {
- def apply[T](): CompactBuffer[T] = new CompactBuffer[T]
+ def apply[T: ClassTag](): CompactBuffer[T] = new CompactBuffer[T]
- def apply[T](value: T): CompactBuffer[T] = {
+ def apply[T: ClassTag](value: T): CompactBuffer[T] = {
val buf = new CompactBuffer[T]
buf += value
}