aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormcheah <mcheah@palantir.com>2015-03-17 11:20:20 +0000
committerSean Owen <sowen@cloudera.com>2015-03-17 11:20:20 +0000
commit005d1c5f290decc606a0be59fb191136dafc0c9d (patch)
treea1a4ff6c9bb19f91b96cc958422a041bf3085697
parent25f35806e307c9635e63b8b12698446a14bdd29d (diff)
downloadspark-005d1c5f290decc606a0be59fb191136dafc0c9d.tar.gz
spark-005d1c5f290decc606a0be59fb191136dafc0c9d.tar.bz2
spark-005d1c5f290decc606a0be59fb191136dafc0c9d.zip
[SPARK-6269] [CORE] Use ScalaRunTime's array methods instead of java.lang.reflect.Array in size estimation
This patch switches the usage of java.lang.reflect.Array in Size estimation to using scala's RunTime array-getter methods. The notes on https://bugs.openjdk.java.net/browse/JDK-8051447 tipped me off to the fact that using java.lang.reflect.Array was not ideal. At first, I used the code from that ticket, but it turns out that ScalaRunTime's array-related methods avoid the bottleneck of invoking native code anyways, so that was sufficient to boost performance in size estimation. The idea is to use pure Java code in implementing the methods there, as opposed to relying on native C code which ends up being ill-performing. This improves the performance of estimating the size of arrays when we are checking for spilling in Spark. Here's the benchmark discussion from the ticket: I did two tests. The first, less convincing, take-with-a-block-of-salt test I did was do a simple groupByKey operation to collect objects in a 4.0 GB text file RDD into 30,000 buckets. I ran 1 Master and 4 Spark Worker JVMs on my mac, fetching the RDD from a text file simply stored on disk, and saving it out to another file located on local disk. The wall clock times I got back before and after the change were: Before: 352.195s, 343.871s, 359.080s After (using code directly from the JDK ticket, not the scala code in this PR): 342.929583s, 329.456623s, 326.151481s So, there is a bit of an improvement after the change. I also did some YourKit profiling of the executors to get an idea of how much time was spent in size estimation before and after the change. I roughly saw that size estimation took up less of the time after my change, but YourKit's profiling can be inconsistent and who knows if I was profiling the executors that had the same data between runs? The more convincing test I did was to run the size-estimation logic itself in an isolated unit test. I ran the following code: ``` val bigArray = Array.fill(1000)(Array.fill(1000)(java.util.UUID.randomUUID().toString())) test("String arrays only perf testing") { val startTime = System.currentTimeMillis() for (i <- 1 to 50000) { SizeEstimator.estimate(bigArray) } println("Runtime: " + (System.currentTimeMillis() - startTime) / 1000.0000) } ``` I wanted to use a 2D array specifically because I wanted to measure the performance of repeatedly calling Array.getLength. I used UUID-Strings to ensure that the strings were randomized (so String object re-use doesn't happen), but that they would all be the same size. The results were as follows: Before PR: 222.681 s, 218.34 s, 211.739s After latest change: 170.715 s, 176.775 s, 180.298 s . Author: mcheah <mcheah@palantir.com> Author: Justin Uang <justin.uang@gmail.com> Closes #4972 from mccheah/feature/spark-6269-reflect-array and squashes the following commits: 8527852 [mcheah] Respect CamelCase for numElementsDrawn 18d4b50 [mcheah] Addressing style comments - while loops instead of for loops 16ce534 [mcheah] Organizing imports properly db890ea [mcheah] Removing CastedArray and just using ScalaRunTime. cb67ce2 [mcheah] Fixing a scalastyle error - line too long 5d53c4c [mcheah] Removing unused parameter in visitArray. 6467759 [mcheah] Including primitive size information inside CastedArray. 93f4b05 [mcheah] Using Scala instead of Java for the array-reflection implementation. a557ab8 [mcheah] Using a wrapper around arrays to do casting only once ca063fc [mcheah] Fixing a compiler error made while refactoring style 1fe09de [Justin Uang] [SPARK-6269] Use a different implementation of java.lang.reflect.Array
-rw-r--r--core/src/main/scala/org/apache/spark/util/SizeEstimator.scala28
1 files changed, 15 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index bce3b3afe9..26ffbf9350 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -18,18 +18,16 @@
package org.apache.spark.util
import java.lang.management.ManagementFactory
-import java.lang.reflect.{Array => JArray}
-import java.lang.reflect.Field
-import java.lang.reflect.Modifier
-import java.util.IdentityHashMap
-import java.util.Random
+import java.lang.reflect.{Field, Modifier}
+import java.util.{IdentityHashMap, Random}
import java.util.concurrent.ConcurrentHashMap
-
import scala.collection.mutable.ArrayBuffer
+import scala.runtime.ScalaRunTime
import org.apache.spark.Logging
import org.apache.spark.util.collection.OpenHashSet
+
/**
* Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in
* memory-aware caches.
@@ -184,9 +182,9 @@ private[spark] object SizeEstimator extends Logging {
private val ARRAY_SIZE_FOR_SAMPLING = 200
private val ARRAY_SAMPLE_SIZE = 100 // should be lower than ARRAY_SIZE_FOR_SAMPLING
- private def visitArray(array: AnyRef, cls: Class[_], state: SearchState) {
- val length = JArray.getLength(array)
- val elementClass = cls.getComponentType
+ private def visitArray(array: AnyRef, arrayClass: Class[_], state: SearchState) {
+ val length = ScalaRunTime.array_length(array)
+ val elementClass = arrayClass.getComponentType()
// Arrays have object header and length field which is an integer
var arrSize: Long = alignSize(objectSize + INT_SIZE)
@@ -199,22 +197,26 @@ private[spark] object SizeEstimator extends Logging {
state.size += arrSize
if (length <= ARRAY_SIZE_FOR_SAMPLING) {
- for (i <- 0 until length) {
- state.enqueue(JArray.get(array, i))
+ var arrayIndex = 0
+ while (arrayIndex < length) {
+ state.enqueue(ScalaRunTime.array_apply(array, arrayIndex).asInstanceOf[AnyRef])
+ arrayIndex += 1
}
} else {
// Estimate the size of a large array by sampling elements without replacement.
var size = 0.0
val rand = new Random(42)
val drawn = new OpenHashSet[Int](ARRAY_SAMPLE_SIZE)
- for (i <- 0 until ARRAY_SAMPLE_SIZE) {
+ var numElementsDrawn = 0
+ while (numElementsDrawn < ARRAY_SAMPLE_SIZE) {
var index = 0
do {
index = rand.nextInt(length)
} while (drawn.contains(index))
drawn.add(index)
- val elem = JArray.get(array, index)
+ val elem = ScalaRunTime.array_apply(array, index).asInstanceOf[AnyRef]
size += SizeEstimator.estimate(elem, state.visited)
+ numElementsDrawn += 1
}
state.size += ((length / (ARRAY_SAMPLE_SIZE * 1.0)) * size).toLong
}